import logging
import json
from pathlib import Path
import pytz
import time
import datetime as dt
import os
import sys
import inspect
from random import shuffle
import traceback
currentdir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)
from Utils import Utils, TradeDirection, MarketSource
from Interfaces.IGInterface import IGInterface
from Interfaces.AVInterface import AVInterface
from Strategies.StrategyFactory import StrategyFactory
from Interfaces.Broker import Broker
[docs]class TradingBot:
"""
Class that initialise and hold references of main components like the
broker interface, the strategy or the epic_ids list
"""
def __init__(self):
# Set timezone
set(pytz.all_timezones_set)
# Load configuration
home_path = os.path.expanduser('~')
config_filepath='{}/.TradingBot/config/config.json'.format(home_path)
config = self.load_json_file(config_filepath)
self.read_configuration(config)
# Read credentials file
credentials = self.load_json_file(self.credentials_filepath)
# Setup the global logger
self.setup_logging()
# Positions container
self.positions = None
# Init trade services and create the broker interface
self.broker = self.init_trading_services(config, credentials)
# Create strategy from the factory class
self.strategy = StrategyFactory(config, self.broker).make_strategy(
self.active_strategy)
[docs] def load_json_file(self, filepath):
"""
Load a JSON formatted file from the given filepath
- **filepath** The filepath including filename and extension
- Return a dictionary of the loaded json
"""
try:
with open(filepath, 'r') as file:
return json.load(file)
except IOError:
logging.error("File not found ({})".format(filepath))
exit()
[docs] def read_configuration(self, config):
"""
Read the configuration from the config json
"""
home = os.path.expanduser('~')
self.epic_ids_filepath = config['general']['epic_ids_filepath'].replace('{home}', home)
self.credentials_filepath = config['general']['credentials_filepath'].replace('{home}', home)
self.debug_log = config['general']['debug_log']
self.enable_log = config['general']['enable_log']
self.log_file = config['general']['log_file'].replace('{home}', home)
self.time_zone = config['general']['time_zone']
self.max_account_usable = config['general']['max_account_usable']
self.market_source = MarketSource(config['general']['market_source']['value'])
self.watchlist_name = config['general']['watchlist_name']
self.active_strategy = config['general']['active_strategy']
[docs] def setup_logging(self):
"""
Setup the global logging settings
"""
# Define the global logging settings
debugLevel = logging.DEBUG if self.debug_log else logging.INFO
# If enabled define log file filename with current timestamp
if self.enable_log:
log_filename = self.log_file
time_str = dt.datetime.now().isoformat()
time_suffix = time_str.replace(':', '_').replace('.', '_')
log_filename = log_filename.replace('{timestamp}', time_suffix)
os.makedirs(os.path.dirname(log_filename), exist_ok=True)
logging.basicConfig(filename=log_filename,
level=debugLevel,
format="[%(asctime)s] %(levelname)s: %(message)s")
else:
logging.basicConfig(level=debugLevel,
format="[%(asctime)s] %(levelname)s: %(message)s")
[docs] def init_trading_services(self, config, credentials):
"""
Create instances of the trading services required, such as web interface
for trading and fetch market data.
- **config** The configuration json
- **credentials** The credentials json
- return: An instance of Broker class initialised
"""
services = {
"ig_index": IGInterface(config, credentials),
"alpha_vantage": AVInterface(credentials['av_api_key'], config)
}
return Broker(config, services)
[docs] def load_epic_ids_from_local_file(self, filepath):
"""
Read a file from filesystem containing a list of epic ids.
The filepath is defined in config.json file
Returns a 'list' of strings where each string is a market epic
"""
# define empty list
epic_ids = []
try:
# open file and read the content in a list
with open(filepath, 'r') as filehandle:
filecontents = filehandle.readlines()
for line in filecontents:
# remove linebreak which is the last character of the string
current_epic_id = line[:-1]
epic_ids.append(current_epic_id)
except IOError:
# Create the file empty
logging.error('{} does not exist!'.format(filepath))
if len(epic_ids) < 1:
logging.error("Epic list is empty!")
return epic_ids
[docs] def start(self, argv):
"""
Starts the TradingBot
"""
while True:
if Utils.is_market_open(self.time_zone):
# Process open positions
self.positions = self.broker.get_open_positions()
self.process_open_positions(self.positions)
if self.market_source == MarketSource.LIST:
self.process_epic_list(
self.load_epic_ids_from_local_file(
self.epic_ids_filepath))
elif self.market_source == MarketSource.WATCHLIST:
self.process_watchlist(self.watchlist_name)
elif self.market_source == MarketSource.API:
# Calling with empty strings starts market navigation from highest level
self.process_market_exploration('180500')
# Wait for next spin loop as configured in the strategy
seconds = self.strategy.get_seconds_to_next_spin()
logging.info("Wait for {0:.2f} seconds before next spin".format(seconds))
time.sleep(seconds)
else:
self.wait_for_next_market_opening()
[docs] def process_watchlist(self, watchlist_name):
"""
Process the markets included in the given IG watchlist
- **watchlist_name**: IG watchlist name
"""
markets = self.broker.get_markets_from_watchlist(self.watchlist_name)
if markets is None:
logging.error("Watchlist {} not found!".format(watchlist_name))
return
for m in markets:
if not self.process_market(m['epic']):
return
[docs] def process_market_exploration(self, node_id):
"""
Navigate the markets using IG API to fetch markets id dinamically
- **node_id**: The node id to navigate markets in
"""
node = self.broker.navigate_market_node(node_id)
if 'nodes' in node and isinstance(node['nodes'], list):
for node in node['nodes']:
self.process_market_exploration(node['id'])
if 'markets' in node and isinstance(node['markets'], list):
for market in node['markets']:
if any(["DFB" in str(market['epic']),
"TODAY" in str(market['epic']),
"DAILY" in str(market['epic'])]):
if not self.process_market(market['epic']):
return
[docs] def process_epic_list(self, epic_list):
"""
Process the given list of epic ids, one by one to find new trades
- **epic_list**: list of epic ids as strings
"""
shuffle(epic_list)
logging.info("Processing epic list of length: {}".format(len(epic_list)))
for epic in epic_list:
if not self.process_market(epic):
return
[docs] def process_market(self, epic):
"""
Process the givem epic using the defined strategy
- **epic**: string representing a market epic id
- Returns **False** if market is closed or if account reach maximum margin, otherwise **True**
"""
percent_used = self.broker.get_account_used_perc()
if percent_used is None:
logging.warning("Stop trading because can't fetch percentage of account used")
return False
if percent_used >= self.max_account_usable:
logging.warning("Stop trading because {}% of account is used".format(str(percent_used)))
return False
if not Utils.is_market_open(self.time_zone):
logging.warn("Market is closed: stop processing")
return False
self.process_trade(epic)
return True
[docs] def close_open_positions(self):
"""
Closes all the open positions in the account
"""
logging.info("Closing all the open positions...")
if self.broker.close_all_positions():
logging.info("All the posisions have been closed.")
else:
logging.error("Impossible to close all open positions, retry.")
[docs] def wait_for_next_market_opening(self):
"""
Sleep until the next market opening. Takes into account weekends
and bank holidays in UK
"""
seconds = Utils.get_seconds_to_market_opening(dt.datetime.now())
logging.info("Market is closed! Wait for {0:.2f} hours...".format(seconds / 3600))
time.sleep(seconds)
[docs] def process_trade(self, epic):
"""
Process a trade checking if it is a "close position" trade or a new action
"""
logging.info("Processing {}".format(epic))
# Use strategy to analyse market
try:
trade, limit, stop = self.strategy.find_trade_signal(epic)
except Exception as e:
logging.error('Exception: {}'.format(e))
logging.debug(e)
logging.debug(traceback.format_exc())
logging.debug(sys.exc_info()[0])
trade = TradeDirection.NONE
if trade is not TradeDirection.NONE:
if self.positions is not None:
for item in self.positions['positions']:
# If a same direction trade already exist, don't trade
if item['market']['epic'] == epic and trade.name == item['position']['direction']:
logging.info( "There is already an open position for this epic, skip trade")
# If a trade in opposite direction exist, close the position
elif item['market']['epic'] == epic and trade.name != item['position']['direction']:
self.broker.close_position(item)
self.broker.trade(epic, trade.name, limit, stop)
else:
logging.error(
"Unable to fetch open positions! Avoid trading this epic")
[docs] def process_open_positions(self, positions):
"""
process the open positions to find closing trades
- **positions**: json object containing open positions
- Returns **False** if an error occurs otherwise True
"""
if positions is not None:
logging.info("Processing open positions.")
self.process_epic_list([item['market']['epic'] for item in positions['positions']])
return True
else:
logging.warning("Unable to fetch open positions!")
return False