import logging
import math
from datetime import datetime
from typing import Optional, Tuple
import numpy
from numpy import Inf, NaN, arange, array, asarray, isscalar
from scipy import stats
from tradingbot.Components.Broker.Broker import Broker
from tradingbot.Components.Configuration import Configuration
from tradingbot.Components.Utils import Interval, TradeDirection, Utils
from tradingbot.Interfaces.Market import Market
from tradingbot.Interfaces.MarketHistory import MarketHistory
from tradingbot.Strategies.Strategy import BacktestResult, Strategy, TradeSignal
[docs]class WeightedAvgPeak(Strategy):
"""
All credits of this strategy goes to GitHub user @tg12.
"""
def __init__(self, config: Configuration, broker: Broker) -> None:
super().__init__(config, broker)
logging.info("Weighted Average Peak strategy initialised.")
[docs] def read_configuration(self, config: Configuration) -> None:
"""
Read the json configuration
"""
raw = config.get_raw_config()
self.max_spread = raw["strategies"]["weighted_avg_peak"]["max_spread"]
self.limit_p = raw["strategies"]["weighted_avg_peak"]["limit_perc"]
self.stop_p = raw["strategies"]["weighted_avg_peak"]["stop_perc"]
# TODO add these to the config file
self.profit_indicator_multiplier = 0.3
self.ESMA_new_margin = 21 # (20% for stocks)
self.too_high_margin = 100 # No stupidly high pip limit per trade
# Normally would be 3/22 days but dull stocks require a lower multiplier
self.ce_multiplier = 2
self.greed_indicator = 99999
[docs] def initialise(self) -> None:
"""
Initialise the strategy
"""
pass
[docs] def fetch_datapoints(self, market: Market) -> MarketHistory:
"""
Fetch weekly prices of past 18 weeks
"""
return self.broker.get_prices(market, Interval.WEEK, 18)
[docs] def find_trade_signal(
self, market: Market, datapoints: MarketHistory
) -> TradeSignal:
"""
TODO add description of strategy key points
"""
# limit_perc = self.limit_p
# stop_perc = max(market.stop_distance_min, self.stop_p)
# Spread constraint
if market.bid - market.offer > self.max_spread:
return TradeDirection.NONE, None, None
# Compute mid price
current_mid = Utils.midpoint(market.bid, market.offer)
high_prices = datapoints.dataframe[MarketHistory.HIGH_COLUMN].values
low_prices = datapoints.dataframe[MarketHistory.LOW_COLUMN].values
close_prices = datapoints.dataframe[MarketHistory.CLOSE_COLUMN].values
ltv = datapoints.dataframe[MarketHistory.VOLUME_COLUMN].values
# Check dataset integrity
array_len_check = []
array_len_check.append(len(high_prices))
array_len_check.append(len(low_prices))
array_len_check.append(len(close_prices))
array_len_check.append(len(ltv))
if not all(x == array_len_check[0] for x in array_len_check):
logging.error("Historic datapoints incomplete for {}".format(market.epic))
return TradeDirection.NONE, None, None
# compute weighted average and std deviation of prices using volume as weight
low_prices = numpy.ma.asarray(low_prices)
high_prices = numpy.ma.asarray(high_prices)
ltv = numpy.ma.asarray(ltv)
low_weighted_avg, low_weighted_std_dev = self.weighted_avg_and_std(
low_prices, ltv
)
high_weighted_avg, high_weighted_std_dev = self.weighted_avg_and_std(
high_prices, ltv
)
# The VWAP can be used similar to moving averages, where prices above
# the VWAP reflect a bullish sentiment and prices below the VWAP
# reflect a bearish sentiment. Traders may initiate short positions as
# a stock price moves below VWAP for a given time period or initiate
# long position as the price moves above VWAP
tmp_high_weight_var = float(high_weighted_avg + high_weighted_std_dev)
tmp_low_weight_var = float(low_weighted_avg + low_weighted_std_dev)
# e.g
# series = [0,0,0,2,0,0,0,-2,0,0,0,2,0,0,0,-2,0]
maxtab_high, _mintab_high = self.peakdet(high_prices, 0.3)
_maxtab_low, mintab_low = self.peakdet(low_prices, 0.3)
# convert to array so can work on min/max
mintab_low_a = array(mintab_low)[:, 1]
maxtab_high_a = array(maxtab_high)[:, 1]
xb = range(0, len(mintab_low_a))
xc = range(0, len(maxtab_high_a))
(
mintab_low_a_slope,
mintab_low_a_intercept,
mintab_low_a_lo_slope,
mintab_low_a_hi_slope,
) = stats.mstats.theilslopes(mintab_low_a, xb, 0.99)
(
maxtab_high_a_slope,
maxtab_high_a_intercept,
maxtab_high_a_lo_slope,
maxtab_high_a_hi_slope,
) = stats.mstats.theilslopes(maxtab_high_a, xc, 0.99)
peak_count_high = 0
peak_count_low = 0
# how may "peaks" are BELOW the threshold
for a in mintab_low_a:
if float(a) < float(tmp_low_weight_var):
peak_count_low += 1
# how may "peaks" are ABOVE the threshold
for a in maxtab_high_a:
if float(a) > float(tmp_high_weight_var):
peak_count_high += 1
additional_checks_sell = [
int(peak_count_low) > int(peak_count_high),
float(mintab_low_a_slope) < float(maxtab_high_a_slope),
]
additional_checks_buy = [
int(peak_count_high) > int(peak_count_low),
float(maxtab_high_a_slope) > float(mintab_low_a_slope),
]
sell_rules = [
float(current_mid) >= float(numpy.max(maxtab_high_a)),
all(additional_checks_sell),
]
buy_rules = [
float(current_mid) <= float(numpy.min(mintab_low_a)),
all(additional_checks_buy),
]
trade_direction = TradeDirection.NONE
if any(buy_rules):
trade_direction = TradeDirection.BUY
elif any(sell_rules):
trade_direction = TradeDirection.SELL
if trade_direction is TradeDirection.NONE:
return trade_direction, None, None
logging.info("Strategy says: {} {}".format(trade_direction.name, market.id))
ATR = self.calculate_stop_loss(close_prices, high_prices, low_prices)
if trade_direction is TradeDirection.BUY:
pip_limit = int(
abs(float(max(high_prices)) - float(market.bid))
* self.profit_indicator_multiplier
)
ce_stop = self.Chandelier_Exit_formula(
trade_direction, ATR, min(low_prices)
)
stop_pips = int(abs(float(market.bid) - (ce_stop)))
elif trade_direction is TradeDirection.SELL:
pip_limit = int(
abs(float(min(low_prices)) - float(market.bid))
* self.profit_indicator_multiplier
)
ce_stop = self.Chandelier_Exit_formula(
trade_direction, ATR, max(high_prices)
)
stop_pips = int(abs(float(market.bid) - (ce_stop)))
esma_new_margin_req = int(Utils.percentage_of(self.ESMA_new_margin, market.bid))
if int(esma_new_margin_req) > int(stop_pips):
stop_pips = int(esma_new_margin_req)
# is there a case for a 20% drop? ... Especially over 18 weeks or
# so?
if int(stop_pips) > int(esma_new_margin_req):
stop_pips = int(esma_new_margin_req)
if int(pip_limit) == 0:
# not worth the trade
trade_direction = TradeDirection.NONE
if int(pip_limit) == 1:
# not worth the trade
trade_direction = TradeDirection.NONE
if int(pip_limit) >= int(self.greed_indicator):
pip_limit = int(self.greed_indicator - 1)
if int(stop_pips) > int(self.too_high_margin):
logging.warning("Junk data for {}".format(market.epic))
return TradeDirection.NONE, None, None
return trade_direction, pip_limit, stop_pips
def calculate_stop_loss(
self,
close_prices: numpy.ndarray,
high_prices: numpy.ndarray,
low_prices: numpy.ndarray,
) -> str:
price_ranges = []
closing_prices = []
first_time_round_loop = True
TR_prices = []
# They should be all the same length but just in case to be safe
length = min(len(close_prices), len(high_prices), len(low_prices))
for index in range(length):
if first_time_round_loop:
# First time round loop cannot get previous
closePrice = close_prices[index]
closing_prices.append(closePrice)
high_price = high_prices[index]
low_price = low_prices[index]
price_range = float(high_price - closePrice)
price_ranges.append(price_range)
first_time_round_loop = False
else:
prev_close = closing_prices[-1]
closePrice = close_prices[index]
closing_prices.append(closePrice)
high_price = high_prices[index]
low_price = low_prices[index]
price_range = float(high_price - closePrice)
price_ranges.append(price_range)
TR = max(
high_price - low_price,
abs(high_price - prev_close),
abs(low_price - prev_close),
)
TR_prices.append(TR)
# for i in prices['prices']:
# if first_time_round_loop:
# # First time round loop cannot get previous
# closePrice = i['closePrice'][price_compare]
# closing_prices.append(closePrice)
# high_price = i['highPrice'][price_compare]
# low_price = i['lowPrice'][price_compare]
# price_range = float(high_price - closePrice)
# price_ranges.append(price_range)
# first_time_round_loop = False
# else:
# prev_close = closing_prices[-1]
# closePrice = i['closePrice'][price_compare]
# closing_prices.append(closePrice)
# high_price = i['highPrice'][price_compare]
# low_price = i['lowPrice'][price_compare]
# price_range = float(high_price - closePrice)
# price_ranges.append(price_range)
# TR = max(high_price - low_price,
# abs(high_price - prev_close),
# abs(low_price - prev_close))
# TR_prices.append(TR)
return str(int(float(max(TR_prices))))
[docs] def weighted_avg_and_std(
self, values: numpy.ndarray, weights: numpy.ndarray
) -> Tuple[float, float]:
"""
Return the weighted average and standard deviation.
values, weights -- Numpy ndarrays with the same shape.
"""
average = numpy.average(values, weights=weights)
variance = numpy.average((values - average) ** 2, weights=weights)
return (float(average), math.sqrt(variance))
[docs] def peakdet(
self, v: numpy.ndarray, delta: float, x: Optional[numpy.ndarray] = None
) -> Tuple[Optional[numpy.ndarray], Optional[numpy.ndarray]]:
"""
Converted from MATLAB script at http://billauer.co.il/peakdet.html
Returns two arrays
function [maxtab, mintab]=peakdet(v, delta, x)
%PEAKDET Detect peaks in a vector
% [MAXTAB, MINTAB] = PEAKDET(V, DELTA) finds the local
% maxima and minima ("peaks") in the vector V.
% MAXTAB and MINTAB consists of two columns. Column 1
% contains indices in V, and column 2 the found values.
%
% With [MAXTAB, MINTAB] = PEAKDET(V, DELTA, X) the indices
% in MAXTAB and MINTAB are replaced with the corresponding
% X-values.
%
% A point is considered a maximum peak if it has the maximal
% value, and was preceded (to the left) by a value lower by
% DELTA.
% Eli Billauer, 3.4.05 (Explicitly not copyrighted).
% This function is released to the public domain; Any use is allowed.
"""
maxtab = []
mintab = []
if x is None:
x = arange(len(v))
v = asarray(v)
if len(v) != len(x):
logging.error("Input vectors v and x must have same length")
return None, None
if not isscalar(delta):
logging.error("Input argument delta must be a scalar")
return None, None
if delta <= 0:
logging.error("Input argument delta must be positive")
return None, None
mn, mx = Inf, -Inf
mnpos, mxpos = NaN, NaN
lookformax = True
for i in arange(len(v)):
this = v[i]
if this > mx:
mx = this
mxpos = x[i]
if this < mn:
mn = this
mnpos = x[i]
if lookformax:
if this < mx - delta:
maxtab.append((mxpos, mx))
mn = this
mnpos = x[i]
lookformax = False
else:
if this > mn + delta:
mintab.append((mnpos, mn))
mx = this
mxpos = x[i]
lookformax = True
return array(maxtab), array(mintab)
def Chandelier_Exit_formula(
self, TRADE_DIR: TradeDirection, ATR: str, Price: float
) -> float:
# Chandelier Exit (long) = 22-day High - ATR(22) x 3
# Chandelier Exit (short) = 22-day Low + ATR(22) x 3
if TRADE_DIR is TradeDirection.BUY:
return float(Price) - float(ATR) * int(self.ce_multiplier)
elif TRADE_DIR is TradeDirection.SELL:
return float(Price) + float(ATR) * int(self.ce_multiplier)
raise ValueError("trade direction can't be NONE")
[docs] def backtest(
self, market: Market, start_date: datetime, end_date: datetime
) -> BacktestResult:
"""Backtest the strategy
"""
# TODO
raise NotImplementedError("Work in progress")