import logging
import os
import inspect
import sys
from collections import deque
from enum import Enum
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)
from Interfaces.Market import Market
[docs]class MarketSource(Enum):
"""
Available market sources: local file list, watch list, market navigation
through API, etc.
"""
LIST = "list"
WATCHLIST = "watchlist"
API = "api"
[docs]class MarketProvider:
"""
Provide markets from different sources based on configuration. Supports
market lists, dynamic market exploration or watchlists
"""
def __init__(self, config, broker):
self.broker = broker
self._read_configuration(config)
self._initialise()
[docs] def next(self):
"""
Return the next market from the configured source
"""
if self.market_source == MarketSource.LIST:
return self._next_from_list()
elif self.market_source == MarketSource.WATCHLIST:
return self._next_from_list()
elif self.market_source == MarketSource.API:
return self._next_from_api()
else:
raise RuntimeError("ERROR: invalid market_source configuration")
[docs] def reset(self):
"""
Reset internal market pointer to the beginning
"""
logging.info("Resetting MarketProvider")
self._initialise()
[docs] def get_market_from_epic(self, epic):
"""
Given a market epic id returns the related market snapshot
"""
return self._create_market(epic)
[docs] def search_market(self, search):
"""
Tries to find the market which id matches the given search string.
If successful return the market snapshot.
Raise an exception when multiple markets match the search string
"""
markets = self.broker.search_market(search)
if markets is None or len(markets) < 1:
raise RuntimeError(
"ERROR: Unable to find market matching: {}".format(search)
)
else:
# Iterate through the list and use a set to verify that the results are all the same market
epic_set = set()
epic = ""
for m in markets:
# Epic are in format: KC.D.PRSMLN.DAILY.IP. Extract third element
market_id = m["epic"].split(".")[2]
epic_set.add(market_id)
# Store the DFB epic
if "DFB" in m["expiry"] and "DAILY" in m["epic"]:
epic = m["epic"]
if not len(epic_set) == 1:
raise RuntimeError(
"ERROR: Multiple markets match the search string: {}".format(search)
)
# Good, it means the result are all the same market
return self._create_market(epic)
def _read_configuration(self, config):
home = os.path.expanduser("~")
self.epic_ids_filepath = config["general"]["epic_ids_filepath"].replace(
"{home}", home
)
self.market_source = MarketSource(config["general"]["market_source"]["value"])
self.watchlist_name = config["general"]["watchlist_name"]
def _initialise(self):
# Initialise epic list
self.epic_list = []
self.epic_list_iter = None
# Initialise API members
self.node_stack = deque()
if self.market_source == MarketSource.LIST:
self.epic_list = self._load_epic_ids_from_local_file(self.epic_ids_filepath)
elif self.market_source == MarketSource.WATCHLIST:
self.epic_list = self._load_epic_ids_from_watchlist(self.watchlist_name)
elif self.market_source == MarketSource.API:
self.epic_list = self._load_epic_ids_from_api_node("180500")
else:
raise RuntimeError("ERROR: invalid market_source configuration")
self.epic_list_iter = iter(self.epic_list)
def _load_epic_ids_from_local_file(self, filepath):
"""
Read a file from filesystem containing a list of epic ids.
The filepath is defined in config.json file
Returns a 'list' of strings where each string is a market epic
"""
# define empty list
epic_ids = []
try:
# open file and read the content in a list
with open(filepath, "r") as filehandle:
filecontents = filehandle.readlines()
for line in filecontents:
# remove linebreak which is the last character of the string
current_epic_id = line[:-1]
epic_ids.append(current_epic_id)
except IOError:
# Create the file empty
logging.error("{} does not exist!".format(filepath))
if len(epic_ids) < 1:
logging.error("Epic list is empty!")
return epic_ids
def _next_from_list(self):
try:
epic = next(self.epic_list_iter)
return self._create_market(epic)
except Exception as e:
raise StopIteration
def _load_epic_ids_from_watchlist(self, watchlist_name):
markets = self.broker.get_markets_from_watchlist(self.watchlist_name)
if markets is None:
message = "Watchlist {} not found!".format(watchlist_name)
logging.error(message)
raise RuntimeError(message)
return [m["epic"] for m in markets]
def _load_epic_ids_from_api_node(self, node_id):
node = self.broker.navigate_market_node(node_id)
if "nodes" in node and isinstance(node["nodes"], list):
for node in node["nodes"]:
self.node_stack.append(node["id"])
return self._load_epic_ids_from_api_node(self.node_stack.pop())
if "markets" in node and isinstance(node["markets"], list):
return [
market["epic"]
for market in node["markets"]
if any(
[
"DFB" in str(market["epic"]),
"TODAY" in str(market["epic"]),
"DAILY" in str(market["epic"]),
]
)
]
return []
def _next_from_api(self):
# Return the next item in the epic_list, but if the list is finished
# navigate the next node in the stack and return a new list
try:
return self._next_from_list()
except Exception as e:
self.epic_list = self._load_epic_ids_from_api_node(self.node_stack.pop())
self.epic_list_iter = iter(self.epic_list)
return self._next_from_list()
def _create_market(self, epic_id):
info = self.broker.get_market_info(epic_id)
if info is None:
raise RuntimeError("Unable to fetch data for {}".format(epic_id))
market = Market()
market.epic = info["epic"]
market.id = info["market_id"]
market.name = info["name"]
market.bid = info["bid"]
market.offer = info["offer"]
market.high = info["high"]
market.low = info["low"]
market.stop_distance_min = info["stop_distance_min"]
return market