Source code for Strategies.WeightedAvgPeak

import os
import inspect
import sys
import logging
import numpy
from numpy import NaN, Inf, arange, isscalar, asarray, array
from scipy import stats
import math

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0,parentdir)

from .Strategy import Strategy
from Utils import Utils, TradeDirection
from Interfaces.Broker import Interval

[docs]class WeightedAvgPeak(Strategy): """ All credits of this strategy goes to GitHub user @tg12. """ def __init__(self, config, broker): super().__init__(config, broker) logging.info('Weighted Average Peak strategy initialised.')
[docs] def read_configuration(self, config): """ Read the json configuration """ self.spin_interval = config['strategies']['weighted_avg_peak']['spin_interval'] self.max_spread = config['strategies']['weighted_avg_peak']['max_spread'] self.limit_p = config['strategies']['weighted_avg_peak']['limit_perc'] self.stop_p = config['strategies']['weighted_avg_peak']['stop_perc'] # TODO add these to the config file self.profit_indicator_multiplier = 0.3 self.ESMA_new_margin = 21 # (20% for stocks) self.too_high_margin = 100 # No stupidly high pip limit per trade # Normally would be 3/22 days but dull stocks require a lower multiplier self.ce_multiplier = 2 self.greed_indicator = 99999
[docs] def find_trade_signal(self, epic_id): """ TODO add description of strategy key points """ # Fetch data for the market snapshot = self.broker.get_market_info(epic_id) if snapshot is None: return TradeDirection.NONE, None, None market_id = snapshot['market_id'] current_bid = snapshot['bid'] current_offer = snapshot['offer'] limit_perc = self.limit_p stop_perc = max(snapshot['stop_distance_min'], self.stop_p) # Spread constraint if current_bid - current_offer > self.max_spread: return TradeDirection.NONE, None, None # Compute mid price current_mid = Utils.midpoint(current_bid, current_offer) # Fetch past prices of the market with weekly resolution data = self.broker.get_prices(epic_id, market_id, Interval.WEEK, 18) if data is None: logging.error('No historic data available for {} ({})'.format(epic_id, market_id)) return TradeDirection.NONE, None, None high_prices = data['high'] low_prices = data['low'] close_prices = data['close'] ltv = data['volume'] # Check dataset integrity array_len_check = [] array_len_check.append(len(high_prices)) array_len_check.append(len(low_prices)) array_len_check.append(len(close_prices)) array_len_check.append(len(ltv)) if not all(x == array_len_check[0] for x in array_len_check): logging.error('Historic prices dataset incomplete for {}'.format(epic_id)) return TradeDirection.NONE, None, None # compute weighted average and std deviation of prices using volume as weight low_prices = numpy.ma.asarray(low_prices) high_prices = numpy.ma.asarray(high_prices) ltv = numpy.ma.asarray(ltv) low_weighted_avg, low_weighted_std_dev = self.weighted_avg_and_std( low_prices, ltv) high_weighted_avg, high_weighted_std_dev = self.weighted_avg_and_std( high_prices, ltv) # The VWAP can be used similar to moving averages, where prices above # the VWAP reflect a bullish sentiment and prices below the VWAP # reflect a bearish sentiment. Traders may initiate short positions as # a stock price moves below VWAP for a given time period or initiate # long position as the price moves above VWAP tmp_high_weight_var = float(high_weighted_avg + high_weighted_std_dev) tmp_low_weight_var = float(low_weighted_avg + low_weighted_std_dev) # e.g # series = [0,0,0,2,0,0,0,-2,0,0,0,2,0,0,0,-2,0] maxtab_high, _mintab_high = self.peakdet(high_prices, .3) _maxtab_low, mintab_low = self.peakdet(low_prices, .3) # convert to array so can work on min/max mintab_low_a = array(mintab_low)[:, 1] maxtab_high_a = array(maxtab_high)[:, 1] xb = range(0, len(mintab_low_a)) xc = range(0, len(maxtab_high_a)) mintab_low_a_slope, mintab_low_a_intercept, mintab_low_a_lo_slope, mintab_low_a_hi_slope = stats.mstats.theilslopes( mintab_low_a, xb, 0.99) maxtab_high_a_slope, maxtab_high_a_intercept, maxtab_high_a_lo_slope, maxtab_high_a_hi_slope = stats.mstats.theilslopes( maxtab_high_a, xc, 0.99) peak_count_high = 0 peak_count_low = 0 # how may "peaks" are BELOW the threshold for a in mintab_low_a: if float(a) < float(tmp_low_weight_var): peak_count_low += 1 # how may "peaks" are ABOVE the threshold for a in maxtab_high_a: if float(a) > float(tmp_high_weight_var): peak_count_high += 1 additional_checks_sell = [ int(peak_count_low) > int(peak_count_high), float(mintab_low_a_slope) < float(maxtab_high_a_slope)] additional_checks_buy = [ int(peak_count_high) > int(peak_count_low), float(maxtab_high_a_slope) > float(mintab_low_a_slope)] sell_rules = [ float(current_mid) >= float( numpy.max(maxtab_high_a)),all(additional_checks_sell)] buy_rules = [ float(current_mid) <= float( numpy.min(mintab_low_a)), all(additional_checks_buy)] trade_direction = TradeDirection.NONE if any(buy_rules): trade_direction = TradeDirection.BUY elif any(sell_rules): trade_direction = TradeDirection.SELL if trade_direction is TradeDirection.NONE: return trade_direction, None, None logging.info("Strategy says: {} {}".format(trade_direction.name, market_id)) ATR = self.calculate_stop_loss(close_prices, high_prices, low_prices) if trade_direction is TradeDirection.BUY: pip_limit = int(abs(float(max(high_prices)) - float(current_bid)) * self.profit_indicator_multiplier) ce_stop = self.Chandelier_Exit_formula( trade_direction, ATR, min(low_prices)) stop_pips = str(int(abs(float(current_bid) - (ce_stop)))) elif trade_direction is TradeDirection.SELL: pip_limit = int(abs(float(min(low_prices)) - float(current_bid)) * self.profit_indicator_multiplier) ce_stop = self.Chandelier_Exit_formula( trade_direction, ATR, max(high_prices)) stop_pips = str(int(abs(float(current_bid) - (ce_stop)))) esma_new_margin_req = int( Utils.percentage_of( self.ESMA_new_margin, current_bid)) if int(esma_new_margin_req) > int(stop_pips): stop_pips = int(esma_new_margin_req) # is there a case for a 20% drop? ... Especially over 18 weeks or # so? if int(stop_pips) > int(esma_new_margin_req): stop_pips = int(esma_new_margin_req) if int(pip_limit) == 0: # not worth the trade trade_direction = TradeDirection.NONE if int(pip_limit) == 1: # not worth the trade trade_direction = TradeDirection.NONE if int(pip_limit) >= int(self.greed_indicator): pip_limit = int(self.greed_indicator - 1) if int(stop_pips) > int(self.too_high_margin): logging.warning("Junk data for {}".format(epic_id)) return TradeDirection.NONE, None, None return trade_direction, pip_limit, stop_pips
def calculate_stop_loss(self, close_prices, high_prices, low_prices): price_ranges = [] closing_prices = [] first_time_round_loop = True TR_prices = [] # They should be all the same length but just in case to be safe length = min(len(close_prices), len(high_prices), len(low_prices)) for index in range(length): if first_time_round_loop: # First time round loop cannot get previous closePrice = close_prices[index] closing_prices.append(closePrice) high_price = high_prices[index] low_price = low_prices[index] price_range = float(high_price - closePrice) price_ranges.append(price_range) first_time_round_loop = False else: prev_close = closing_prices[-1] closePrice = close_prices[index] closing_prices.append(closePrice) high_price = high_prices[index] low_price = low_prices[index] price_range = float(high_price - closePrice) price_ranges.append(price_range) TR = max(high_price - low_price, abs(high_price - prev_close), abs(low_price - prev_close)) TR_prices.append(TR) # for i in prices['prices']: # if first_time_round_loop: # # First time round loop cannot get previous # closePrice = i['closePrice'][price_compare] # closing_prices.append(closePrice) # high_price = i['highPrice'][price_compare] # low_price = i['lowPrice'][price_compare] # price_range = float(high_price - closePrice) # price_ranges.append(price_range) # first_time_round_loop = False # else: # prev_close = closing_prices[-1] # closePrice = i['closePrice'][price_compare] # closing_prices.append(closePrice) # high_price = i['highPrice'][price_compare] # low_price = i['lowPrice'][price_compare] # price_range = float(high_price - closePrice) # price_ranges.append(price_range) # TR = max(high_price - low_price, # abs(high_price - prev_close), # abs(low_price - prev_close)) # TR_prices.append(TR) return str(int(float(max(TR_prices))))
[docs] def weighted_avg_and_std(self, values, weights): """ Return the weighted average and standard deviation. values, weights -- Numpy ndarrays with the same shape. """ average = numpy.average(values, weights=weights) variance = numpy.average((values - average)**2, weights=weights) return (average, math.sqrt(variance))
[docs] def peakdet(self, v, delta, x=None): """ Converted from MATLAB script at http://billauer.co.il/peakdet.html Returns two arrays function [maxtab, mintab]=peakdet(v, delta, x) %PEAKDET Detect peaks in a vector % [MAXTAB, MINTAB] = PEAKDET(V, DELTA) finds the local % maxima and minima ("peaks") in the vector V. % MAXTAB and MINTAB consists of two columns. Column 1 % contains indices in V, and column 2 the found values. % % With [MAXTAB, MINTAB] = PEAKDET(V, DELTA, X) the indices % in MAXTAB and MINTAB are replaced with the corresponding % X-values. % % A point is considered a maximum peak if it has the maximal % value, and was preceded (to the left) by a value lower by % DELTA. % Eli Billauer, 3.4.05 (Explicitly not copyrighted). % This function is released to the public domain; Any use is allowed. """ maxtab = [] mintab = [] if x is None: x = arange(len(v)) v = asarray(v) if len(v) != len(x): logging.error('Input vectors v and x must have same length') return None, None if not isscalar(delta): logging.error('Input argument delta must be a scalar') return None, None if delta <= 0: logging.error('Input argument delta must be positive') return None, None mn, mx = Inf, -Inf mnpos, mxpos = NaN, NaN lookformax = True for i in arange(len(v)): this = v[i] if this > mx: mx = this mxpos = x[i] if this < mn: mn = this mnpos = x[i] if lookformax: if this < mx - delta: maxtab.append((mxpos, mx)) mn = this mnpos = x[i] lookformax = False else: if this > mn + delta: mintab.append((mnpos, mn)) mx = this mxpos = x[i] lookformax = True return array(maxtab), array(mintab)
def Chandelier_Exit_formula(self, TRADE_DIR, ATR, Price): # Chandelier Exit (long) = 22-day High - ATR(22) x 3 # Chandelier Exit (short) = 22-day Low + ATR(22) x 3 if TRADE_DIR is TradeDirection.BUY: return float(Price) - float(ATR) * int(self.ce_multiplier) elif TRADE_DIR is TradeDirection.SELL: return float(Price) + float(ATR) * int(self.ce_multiplier)
[docs] def get_seconds_to_next_spin(self): # Return the amount of seconds between each spin of the strategy # Each spin analyse all the markets in the list/watchlist return 3600 * 2 # every 2 hours