import logging
import traceback
from datetime import datetime as dt
from pathlib import Path
from typing import List, Optional
import pytz
from tradingbot.Components.Backtester import Backtester
from tradingbot.Components.Broker.Broker import Broker as BrokerInterface
from tradingbot.Components.Broker.BrokerFactory import BrokerFactory
from tradingbot.Components.Configuration import Configuration
from tradingbot.Components.MarketProvider import MarketProvider
from tradingbot.Components.TimeProvider import TimeAmount, TimeProvider
from tradingbot.Components.Utils import (
MarketClosedException,
NotSafeToTradeException,
TradeDirection,
)
from tradingbot.Interfaces.Market import Market
from tradingbot.Interfaces.Position import Position
from tradingbot.Strategies.StrategyFactory import StrategyFactory, StrategyImpl
[docs]class TradingBot:
"""
Class that initialise and hold references of main components like the
broker interface, the strategy or the epic_ids list
"""
time_provider: TimeProvider
config: Configuration
broker: BrokerInterface
strategy: StrategyImpl
market_provider: MarketProvider
def __init__(
self,
time_provider: Optional[TimeProvider] = None,
config_filepath: Optional[Path] = None,
) -> None:
# Time manager
self.time_provider = time_provider if time_provider else TimeProvider()
# Set timezone
set(pytz.all_timezones_set)
# Load configuration
self.config = Configuration.from_filepath(config_filepath)
# Setup the global logger
self.setup_logging()
# Init trade services and create the broker interface
# The Factory is used to create the services from the configuration file
self.broker = BrokerInterface(BrokerFactory(self.config))
# Create strategy from the factory class
self.strategy = StrategyFactory(
self.config, self.broker
).make_from_configuration()
# Create the market provider
self.market_provider = MarketProvider(self.config, self.broker)
[docs] def setup_logging(self) -> None:
"""
Setup the global logging settings
"""
# Clean logging handlers
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# Define the global logging settings
debugLevel = (
logging.DEBUG if self.config.is_logging_debug_enabled() else logging.INFO
)
if self.config.is_logging_enabled():
log_filename = self.config.get_log_filepath()
Path(log_filename).parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
filename=log_filename,
level=debugLevel,
format="[%(asctime)s] %(levelname)s: %(message)s",
)
else:
logging.basicConfig(
level=debugLevel, format="[%(asctime)s] %(levelname)s: %(message)s"
)
[docs] def start(self) -> None:
"""
Starts the TradingBot main loop
- process open positions
- process markets from market source
- wait for configured wait time
- start over
"""
while True:
try:
# Process current open positions
self.process_open_positions()
# Now process markets from the configured market source
self.process_market_source()
# Wait for the next spin before starting over
self.time_provider.wait_for(
TimeAmount.SECONDS, self.config.get_spin_interval()
)
except MarketClosedException:
logging.warning("Market is closed: stop processing")
self.time_provider.wait_for(TimeAmount.NEXT_MARKET_OPENING)
except NotSafeToTradeException:
self.time_provider.wait_for(
TimeAmount.SECONDS, self.config.get_spin_interval()
)
except StopIteration:
self.time_provider.wait_for(
TimeAmount.SECONDS, self.config.get_spin_interval()
)
except Exception as e:
logging.error("Generic exception caught: {}".format(e))
logging.error(traceback.format_exc())
continue
[docs] def process_open_positions(self) -> None:
"""
Fetch open positions markets and run the strategy against them closing the
trades if required
"""
positions = self.broker.get_open_positions()
# Do not run until we know the current open positions
if positions is None:
logging.warning("Unable to fetch open positions! Will try again...")
raise RuntimeError("Unable to fetch open positions")
for epic in [item.epic for item in positions]:
market = self.market_provider.get_market_from_epic(epic)
self.process_market(market, positions)
[docs] def process_market_source(self) -> None:
"""
Process markets from the configured market source
"""
while True:
market = self.market_provider.next()
positions = self.broker.get_open_positions()
if positions is None:
logging.warning("Unable to fetch open positions! Will try again...")
raise RuntimeError("Unable to fetch open positions")
self.process_market(market, positions)
[docs] def process_market(self, market: Market, open_positions: List[Position]) -> None:
"""Spin the strategy on all the markets"""
self.safety_checks()
logging.info("Processing {}".format(market.id))
try:
self.strategy.set_open_positions(open_positions)
trade, limit, stop = self.strategy.run(market)
self.process_trade(market, trade, limit, stop, open_positions)
except Exception as e:
logging.error("Strategy exception caught: {}".format(e))
logging.error(traceback.format_exc())
return
[docs] def close_open_positions(self) -> None:
"""
Closes all the open positions in the account
"""
logging.info("Closing all the open positions...")
if self.broker.close_all_positions():
logging.info("All the posisions have been closed.")
else:
logging.error("Impossible to close all open positions, retry.")
[docs] def safety_checks(self) -> None:
"""
Perform some safety checks before running the strategy against the next market
Raise exceptions if not safe to trade
"""
percent_used = self.broker.get_account_used_perc()
if percent_used is None:
logging.warning(
"Stop trading because can't fetch percentage of account used"
)
raise NotSafeToTradeException()
if percent_used >= self.config.get_max_account_usable():
logging.warning(
"Stop trading because {}% of account is used".format(str(percent_used))
)
raise NotSafeToTradeException()
if not self.time_provider.is_market_open(self.config.get_time_zone()):
raise MarketClosedException()
[docs] def process_trade(
self,
market: Market,
direction: TradeDirection,
limit: Optional[float],
stop: Optional[float],
open_positions: List[Position],
) -> None:
"""
Process a trade checking if it is a "close position" trade or a new trade
"""
# Perform trade only if required
if direction is TradeDirection.NONE or limit is None or stop is None:
return
if len(open_positions) > 0:
for item in open_positions:
# If a same direction trade already exist, don't trade
if item.epic == market.epic and direction is item.direction:
logging.info(
"There is already an open position for this epic, skip trade"
)
return
# If a trade in opposite direction exist, close the position
elif item.epic == market.epic and direction is not item.direction:
self.broker.close_position(item)
return
self.broker.trade(market.epic, direction, limit, stop)
else:
logging.error("Unable to fetch open positions! Avoid trading this epic")
[docs] def backtest(
self,
market_id: str,
start_date: str,
end_date: str,
epic_id: Optional[str] = None,
) -> None:
"""
Backtest a market using the configured strategy
"""
try:
start = dt.strptime(start_date, "%Y-%m-%d")
end = dt.strptime(end_date, "%Y-%m-%d")
except ValueError as e:
logging.error("Wrong date format! Must be YYYY-MM-DD")
logging.debug(e)
exit(1)
bt = Backtester(self.broker, self.strategy)
try:
market = (
self.market_provider.search_market(market_id)
if epic_id is None or epic_id == ""
else self.market_provider.get_market_from_epic(epic_id)
)
except Exception as e:
logging.error(e)
exit(1)
bt.start(market, start, end)
bt.print_results()