Source code for Interfaces.IGInterface

import requests
import json
import logging
import os
import inspect
import sys
from enum import Enum
import pandas as pd

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0,parentdir)

from Utils import Utils, TradeDirection

class IG_API_URL(Enum):
    """
    IG REST API urls
    """
    BASE_URI = 'https://@api.ig.com/gateway/deal'
    DEMO_PREFIX = 'demo-'
    SESSION = 'session'
    ACCOUNTS = 'accounts'
    POSITIONS = 'positions'
    POSITIONS_OTC = 'positions/otc'
    MARKETS = 'markets'
    PRICES = 'prices'
    CONFIRMS = 'confirms'
    MARKET_NAV = 'marketnavigation'
    WATCHLISTS = 'watchlists'


[docs]class IGInterface(): """ IG broker interface class, provides functions to use the IG REST API """ def __init__(self, config, credentials): self.read_configuration(config) demoPrefix = IG_API_URL.DEMO_PREFIX.value if self.useDemo else '' self.apiBaseURL = IG_API_URL.BASE_URI.value.replace('@', demoPrefix) self.authenticated_headers = {} if self.paperTrading: logging.info('Paper trading is active') if not self.authenticate(credentials): logging.error("Authentication failed") exit() logging.info("IG initialised.")
[docs] def read_configuration(self, config): """ Read the configuration from the config json """ self.useDemo = config['ig_interface']['use_demo_account'] self.orderType = config['ig_interface']['order_type'] self.orderSize = config['ig_interface']['order_size'] self.orderExpiry = config['ig_interface']['order_expiry'] self.useGStop = config['ig_interface']['use_g_stop'] self.orderCurrency = config['ig_interface']['order_currency'] self.orderForceOpen = config['ig_interface']['order_force_open'] self.paperTrading = config['ig_interface']['paper_trading']
[docs] def authenticate(self, credentials): """ Authenticate the IGInterface instance with the given credentials - **credentials**: json object containing username, passowrd, default account and api key - Returns **False** if an error occurs otherwise True """ data = {"identifier": credentials['username'], "password": credentials['password']} headers = {'Content-Type': 'application/json; charset=utf-8', 'Accept': 'application/json; charset=utf-8', 'X-IG-API-KEY': credentials['api_key'], 'Version': '2' } url = '{}/{}'.format(self.apiBaseURL, IG_API_URL.SESSION.value) response = requests.post(url, data=json.dumps(data), headers=headers) if response.status_code != 200: return False headers_json = dict(response.headers) try: CST_token = headers_json["CST"] x_sec_token = headers_json["X-SECURITY-TOKEN"] except: return False self.authenticated_headers = {'Content-Type': 'application/json; charset=utf-8', 'Accept': 'application/json; charset=utf-8', 'X-IG-API-KEY': credentials['api_key'], 'CST': CST_token, 'X-SECURITY-TOKEN': x_sec_token} self.set_default_account(credentials['account_id']) return True
[docs] def set_default_account(self, accountId): """ Sets the IG account to use - **accountId**: String representing the accound id to use - Returns **False** if an error occurs otherwise True """ url = '{}/{}'.format(self.apiBaseURL, IG_API_URL.SESSION.value) data = {"accountId": accountId, "defaultAccount": "True"} response = requests.put(url, data=json.dumps(data), headers=self.authenticated_headers) if response.status_code != 200: return False logging.info('Using default account: {}'.format(accountId)) return True
[docs] def get_account_balances(self): """ Returns a tuple (balance, deposit) for the account in use - Returns **(None,None)** if an error occurs otherwise (balance, deposit) """ url = '{}/{}'.format(self.apiBaseURL, IG_API_URL.ACCOUNTS.value) d = self.http_get(url) if d is not None: for i in d['accounts']: if str(i['accountType']) == "SPREADBET": balance = i['balance']['balance'] deposit = i['balance']['deposit'] return balance, deposit else: return None, None
[docs] def get_open_positions(self): """ Returns the account open positions in an json object - Returns the json object returned by the IG API """ url = '{}/{}'.format(self.apiBaseURL, IG_API_URL.POSITIONS.value) return self.http_get(url)
[docs] def get_positions_map(self): """ Returns a *dict* containing the account open positions in the form {string: int} where the string is defined as 'marketId-tradeDirection' and the int is the trade size - Returns **None** if an error occurs otherwise a dict(string:int) """ positionMap = {} position_json = self.get_open_positions() if position_json is not None: for item in position_json['positions']: direction = item['position']['direction'] dealSize = item['position']['dealSize'] ccypair = item['market']['epic'] key = ccypair + '-' + direction if(key in positionMap): positionMap[key] = dealSize + positionMap[key] else: positionMap[key] = dealSize return positionMap else: return None
[docs] def get_market_info(self, epic_id): """ Returns info for the given market including a price snapshot - **epic_id**: market epic as string - Returns **None** if an error occurs otherwise the json returned by IG API """ url = '{}/{}/{}'.format(self.apiBaseURL, IG_API_URL.MARKETS.value, epic_id) market = self.http_get(url) return market
[docs] def get_prices(self, epic_id, interval, data_range): """ Returns past prices for the given epic - **epic_id**: market epic as string - **interval**: resolution of the time series: minute, hours, etc. - **data_range**: amount of datapoint to fetch - Returns **None** if an error occurs otherwise the json object returned by IG API """ url = '{}/{}/{}/{}/{}'.format(self.apiBaseURL, IG_API_URL.PRICES.value, epic_id, interval, data_range) d = self.http_get(url) if d is not None and 'allowance' in d: remaining_allowance = d['allowance']['remainingAllowance'] reset_time = Utils.humanize_time(int(d['allowance']['allowanceExpiry'])) if remaining_allowance < 100: logging.warn("Remaining API calls left: {}".format(str(remaining_allowance))) logging.warn("Time to API Key reset: {}".format(str(reset_time))) return d
[docs] def trade(self, epic_id, trade_direction, limit, stop): """ Try to open a new trade for the given epic - **epic_id**: market epic as string - **trade_direction**: BUY or SELL - **limit**: limit level - **stop**: stop level - Returns **False** if an error occurs otherwise True """ if self.paperTrading: logging.info('Paper trade: {} {} with limit={} and stop={}'.format(trade_direction,epic_id,limit,stop)) return True url = '{}/{}'.format(self.apiBaseURL, IG_API_URL.POSITIONS_OTC.value) data = { "direction": trade_direction, "epic": epic_id, "limitLevel": limit, "orderType": self.orderType, "size": self.orderSize, "expiry": self.orderExpiry, "guaranteedStop": self.useGStop, "currencyCode": self.orderCurrency, "forceOpen": self.orderForceOpen, "stopLevel": stop } r = requests.post( url, data=json.dumps(data), headers=self.authenticated_headers) if r.status_code != 200: return False d = json.loads(r.text) deal_ref = d['dealReference'] if self.confirm_order(deal_ref): logging.info("Order {} for {} confirmed with limit={} and stop={}".format(trade_direction, epic_id, limit, stop)) return True else: logging.warning("Trade {} of {} has failed!".format(trade_direction, epic_id)) return False
[docs] def confirm_order(self, dealRef): """ Confirm an order from a dealing reference - **dealRef**: dealing reference to confirm - Returns **False** if an error occurs otherwise True """ url = '{}/{}/{}'.format(self.apiBaseURL, IG_API_URL.CONFIRMS.value, dealRef) d = self.http_get(url) if d is not None: if d['reason'] != "SUCCESS": return False else: return True return False
[docs] def close_position(self, position): """ Close the given market position - **position**: position json object obtained from IG API - Returns **False** if an error occurs otherwise True """ if self.paperTrading: logging.info('Paper trade: close {} position'.format(position['market']['instrumentName'])) return True # To close we need the opposite direction direction = TradeDirection.NONE if position['position']['direction'] == TradeDirection.BUY.name: direction = TradeDirection.SELL.name elif position['position']['direction'] == TradeDirection.SELL.name: direction = TradeDirection.BUY.name else: logging.error("Wrong position direction!") return False url = '{}/{}'.format(self.apiBaseURL, IG_API_URL.POSITIONS_OTC.value) data = { "dealId": position['position']['dealId'], "epic": None, "expiry": None, "direction": direction, "size": "1", "level": None, "orderType": "MARKET", "timeInForce": None, "quoteId": None } del_headers = dict(self.authenticated_headers) del_headers['_method'] = 'DELETE' r = requests.post( url, data=json.dumps(data), headers=del_headers) if r.status_code != 200: return False d = json.loads(r.text) deal_ref = d['dealReference'] if self.confirm_order(deal_ref): logging.info("Position for {} closed".format(position['market']['instrumentName'])) return True else: logging.error("Could not close position for {}".format(position['market']['instrumentName'])) return False
[docs] def close_all_positions(self): """ Try to close all the account open positions. - Returns **False** if an error occurs otherwise True """ result = True try: positions = self.get_open_positions() if positions is not None: for p in positions['positions']: try: if not self.close_position(p): result = False except: logging.error('Error closing position for {}'.format(p['market']['instrumentName'])) result = False else: logging.error("Unable to retrieve open positions!") result = False except: logging.error("Error during close all positions") result = False return result
[docs] def get_account_used_perc(self): """ Fetch the percentage of available balance is currently used - Returns the percentage of account used over total available amount """ balance, deposit = self.get_account_balances() if balance is None or deposit is None: return None return Utils.percentage(deposit, balance)
[docs] def navigate_market_node(self, node_id): """ Navigate the market node id - Returns the json representing the market node """ url = '{}/{}/{}'.format(self.apiBaseURL, IG_API_URL.MARKET_NAV.value, node_id) data = self.http_get(url) return data if data is not None else None
[docs] def get_watchlist(self, id): """ Get the watchlist info - **id**: id of the watchlist. If empty id is provided, the function returns the list of all the watchlist in the account """ url = '{}/{}/{}'.format(self.apiBaseURL, IG_API_URL.WATCHLISTS.value, id) data = self.http_get(url) return data if data is not None else None
[docs] def get_markets_from_watchlist(self, name): """ Get the list of markets included in the watchlist - **name**: name of the watchlist """ markets = [] # Request with empty name returns list of all the watchlists all_watchlists = self.get_watchlist('') if all_watchlists is not None: for w in all_watchlists['watchlists']: if 'name' in w and w['name'] == name: data = self.get_watchlist(w['id']) if data is not None and 'markets' in data: return data['markets'] return None
[docs] def http_get(self, url): """ Perform an HTTP GET request to the url. Return the json object returned from the API if 200 is received Return None if an error is received from the API """ try: response = requests.get(url, headers=self.authenticated_headers) if response.status_code != 200: return None data = json.loads(response.text) if 'errorCode' in data: logging.error(data['errorCode']) return None else: return data except: return None
[docs] def macd_dataframe(self, epic, interval): """ Return a datafram with MACD data for the requested market """ prices = self.get_prices(epic, 'DAY', 26) if prices is None: return None # Prepare data prevBid = 0 hist_data = [] for p in prices['prices']: if p['closePrice']['bid'] is None: hist_data.append(prevBid) else: hist_data.append(p['closePrice']['bid']) prevBid = p['closePrice']['bid'] # Calculate the MACD indicator px = pd.DataFrame({'close': hist_data}) px['26_ema'] = pd.DataFrame.ewm(px['close'], span=26).mean() px['12_ema'] = pd.DataFrame.ewm(px['close'], span=12).mean() px['MACD'] = (px['12_ema'] - px['26_ema']) px['MACD_Signal'] = px['MACD'].rolling(9).mean() px['MACD_Hist'] = (px['MACD'] - px['MACD_Signal']) return px