import json
import logging
from enum import Enum
from typing import Any, Dict, List, Optional
import pandas
import requests
from tradingbot.Components.Broker.AbstractInterfaces import (
AccountBalances,
AccountInterface,
StocksInterface,
)
from tradingbot.Components.Utils import Interval, TradeDirection, Utils
from tradingbot.Interfaces.Market import Market
from tradingbot.Interfaces.MarketHistory import MarketHistory
from tradingbot.Interfaces.MarketMACD import MarketMACD
from tradingbot.Interfaces.Position import Position
[docs]class IG_API_URL(Enum):
"""
IG REST API urls
"""
BASE_URI = "https://@api.ig.com/gateway/deal"
DEMO_PREFIX = "demo-"
SESSION = "session"
ACCOUNTS = "accounts"
POSITIONS = "positions"
POSITIONS_OTC = "positions/otc"
MARKETS = "markets"
PRICES = "prices"
CONFIRMS = "confirms"
MARKET_NAV = "marketnavigation"
WATCHLISTS = "watchlists"
[docs]class IGInterface(AccountInterface, StocksInterface):
"""
IG broker interface class, provides functions to use the IG REST API
"""
api_base_url: str
authenticated_headers: Dict[str, str]
def initialise(self) -> None:
logging.info("initialising IGInterface...")
demoPrefix = (
IG_API_URL.DEMO_PREFIX.value
if self._config.get_ig_use_demo_account()
else ""
)
self.api_base_url = IG_API_URL.BASE_URI.value.replace("@", demoPrefix)
self.authenticated_headers = {}
if self._config.get_ig_paper_trading():
logging.info("Paper trading is active")
if not self.authenticate():
logging.error("Authentication failed")
raise RuntimeError("Unable to authenticate to IG Index. Check credentials")
[docs] def authenticate(self) -> bool:
"""
Authenticate the IGInterface instance with the configured credentials
"""
data = {
"identifier": self._config.get_credentials()["username"],
"password": self._config.get_credentials()["password"],
}
headers = {
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json; charset=utf-8",
"X-IG-API-KEY": self._config.get_credentials()["api_key"],
"Version": "2",
}
url = "{}/{}".format(self.api_base_url, IG_API_URL.SESSION.value)
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code != 200:
return False
headers_json = dict(response.headers)
try:
CST_token = headers_json["CST"]
x_sec_token = headers_json["X-SECURITY-TOKEN"]
except Exception:
return False
self.authenticated_headers = {
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json; charset=utf-8",
"X-IG-API-KEY": self._config.get_credentials()["api_key"],
"CST": CST_token,
"X-SECURITY-TOKEN": x_sec_token,
}
self.set_default_account(self._config.get_credentials()["account_id"])
return True
[docs] def set_default_account(self, accountId: str) -> bool:
"""
Sets the IG account to use
- **accountId**: String representing the accound id to use
- Returns **False** if an error occurs otherwise True
"""
url = "{}/{}".format(self.api_base_url, IG_API_URL.SESSION.value)
data = {"accountId": accountId, "defaultAccount": "True"}
response = requests.put(
url, data=json.dumps(data), headers=self.authenticated_headers
)
if response.status_code != 200:
return False
logging.info("Using default account: {}".format(accountId))
return True
[docs] def get_account_balances(self) -> AccountBalances:
"""
Returns a tuple (balance, deposit) for the account in use
- Returns **(None,None)** if an error occurs otherwise (balance, deposit)
"""
url = "{}/{}".format(self.api_base_url, IG_API_URL.ACCOUNTS.value)
d = self._http_get(url)
if d is not None:
for i in d["accounts"]:
if str(i["accountType"]) == "SPREADBET":
balance = i["balance"]["balance"]
deposit = i["balance"]["deposit"]
return balance, deposit
return None, None
[docs] def get_open_positions(self) -> List[Position]:
"""
Returns the account open positions in an json object
- Returns the json object returned by the IG API
"""
url = "{}/{}".format(self.api_base_url, IG_API_URL.POSITIONS.value)
data = self._http_get(url)
positions = []
for d in data["positions"]:
positions.append(
Position(
deal_id=d["position"]["dealId"],
size=d["position"]["size"],
create_date=d["position"]["createdDateUTC"],
direction=TradeDirection[d["position"]["direction"]],
level=d["position"]["level"],
limit=d["position"]["limitLevel"],
stop=d["position"]["stopLevel"],
currency=d["position"]["currency"],
epic=d["market"]["epic"],
market_id=None,
)
)
return positions
[docs] def get_positions_map(self) -> Dict[str, int]:
"""
Returns a *dict* containing the account open positions in the form
{string: int} where the string is defined as 'marketId-tradeDirection' and
the int is the trade size
- Returns **None** if an error occurs otherwise a dict(string:int)
"""
positionMap: Dict[str, int] = {}
for item in self.get_open_positions():
key = item.epic + "-" + item.direction.name
if key in positionMap:
positionMap[key] = item.size + positionMap[key]
else:
positionMap[key] = item.size
return positionMap
[docs] def get_market_info(self, epic_id: str) -> Market:
"""
Returns info for the given market including a price snapshot
- **epic_id**: market epic as string
- Returns **None** if an error occurs otherwise the json returned by IG API
"""
url = "{}/{}/{}".format(self.api_base_url, IG_API_URL.MARKETS.value, epic_id)
info = self._http_get(url)
if "markets" in info:
raise RuntimeError("Multiple matches found for epic: {}".format(epic_id))
if self._config.get_ig_controlled_risk():
info["minNormalStopOrLimitDistance"] = info["minControlledRiskStopDistance"]
market = Market()
market.epic = info["instrument"]["epic"]
market.id = info["instrument"]["marketId"]
market.name = info["instrument"]["name"]
market.bid = info["snapshot"]["bid"]
market.offer = info["snapshot"]["offer"]
market.high = info["snapshot"]["high"]
market.low = info["snapshot"]["low"]
market.stop_distance_min = info["dealingRules"]["minNormalStopOrLimitDistance"][
"value"
]
market.expiry = info["instrument"]["expiry"]
return market
[docs] def search_market(self, search: str) -> List[Market]:
"""
Returns a list of markets that matched the search string
"""
url = "{}/{}?searchTerm={}".format(
self.api_base_url, IG_API_URL.MARKETS.value, search
)
data = self._http_get(url)
markets = []
if data is not None and "markets" in data:
markets = [self.get_market_info(m["epic"]) for m in data["markets"]]
return markets
def get_prices(
self, market: Market, interval: Interval, data_range: int
) -> MarketHistory:
url = "{}/{}/{}/{}/{}".format(
self.api_base_url,
IG_API_URL.PRICES.value,
market.epic,
interval,
data_range,
)
data = self._http_get(url)
if "allowance" in data:
remaining_allowance = data["allowance"]["remainingAllowance"]
reset_time = Utils.humanize_time(int(data["allowance"]["allowanceExpiry"]))
if remaining_allowance < 100:
logging.warn(
"Remaining API calls left: {}".format(str(remaining_allowance))
)
logging.warn("Time to API Key reset: {}".format(str(reset_time)))
dates = []
highs = []
lows = []
closes = []
volumes = []
for price in data["prices"]:
dates.append(price["snapshotTimeUTC"])
highs.append(price["highPrice"]["bid"])
lows.append(price["lowPrice"]["bid"])
closes.append(price["closePrice"]["bid"])
volumes.append(float(price["lastTradedVolume"]))
history = MarketHistory(market, dates, highs, lows, closes, volumes)
return history
[docs] def trade(
self, epic_id: str, trade_direction: TradeDirection, limit: float, stop: float
) -> bool:
"""
Try to open a new trade for the given epic
- **epic_id**: market epic as string
- **trade_direction**: BUY or SELL
- **limit**: limit level
- **stop**: stop level
- Returns **False** if an error occurs otherwise True
"""
if self._config.get_ig_paper_trading():
logging.info(
"Paper trade: {} {} with limit={} and stop={}".format(
trade_direction.value, epic_id, limit, stop
)
)
return True
url = "{}/{}".format(self.api_base_url, IG_API_URL.POSITIONS_OTC.value)
data = {
"direction": trade_direction.value,
"epic": epic_id,
"limitLevel": limit,
"orderType": self._config.get_ig_order_type(),
"size": self._config.get_ig_order_size(),
"expiry": self._config.get_ig_order_expiry(),
"guaranteedStop": self._config.get_ig_use_g_stop(),
"currencyCode": self._config.get_ig_order_currency(),
"forceOpen": self._config.get_ig_order_force_open(),
"stopLevel": stop,
}
r = requests.post(
url, data=json.dumps(data), headers=self.authenticated_headers
)
if r.status_code != 200:
return False
d = json.loads(r.text)
deal_ref = d["dealReference"]
if self.confirm_order(deal_ref):
logging.info(
"Order {} for {} confirmed with limit={} and stop={}".format(
trade_direction.value, epic_id, limit, stop
)
)
return True
else:
logging.warning(
"Trade {} of {} has failed!".format(trade_direction.value, epic_id)
)
return False
[docs] def confirm_order(self, dealRef: str) -> bool:
"""
Confirm an order from a dealing reference
- **dealRef**: dealing reference to confirm
- Returns **False** if an error occurs otherwise True
"""
url = "{}/{}/{}".format(self.api_base_url, IG_API_URL.CONFIRMS.value, dealRef)
d = self._http_get(url)
if d is not None:
if d["reason"] != "SUCCESS":
return False
else:
return True
return False
[docs] def close_position(self, position: Position) -> bool:
"""
Close the given market position
- **position**: position json object obtained from IG API
- Returns **False** if an error occurs otherwise True
"""
if self._config.get_ig_paper_trading():
logging.info("Paper trade: close {} position".format(position.epic))
return True
# To close we need the opposite direction
direction = TradeDirection.NONE
if position.direction is TradeDirection.BUY:
direction = TradeDirection.SELL
elif position.direction is TradeDirection.SELL:
direction = TradeDirection.BUY
else:
logging.error("Wrong position direction!")
return False
url = "{}/{}".format(self.api_base_url, IG_API_URL.POSITIONS_OTC.value)
data = {
"dealId": position.deal_id,
"epic": None,
"expiry": None,
"direction": direction.name,
"size": "1",
"level": None,
"orderType": "MARKET",
"timeInForce": None,
"quoteId": None,
}
del_headers = dict(self.authenticated_headers)
del_headers["_method"] = "DELETE"
r = requests.post(url, data=json.dumps(data), headers=del_headers)
if r.status_code != 200:
return False
d = json.loads(r.text)
deal_ref = d["dealReference"]
if self.confirm_order(deal_ref):
logging.info("Position for {} closed".format(position.epic))
return True
else:
logging.error("Could not close position for {}".format(position.epic))
return False
[docs] def close_all_positions(self) -> bool:
"""
Try to close all the account open positions.
- Returns **False** if an error occurs otherwise True
"""
result = True
try:
positions = self.get_open_positions()
if positions is not None:
for p in positions:
try:
if not self.close_position(p):
result = False
except Exception:
logging.error(
"Error closing position for {}".format(p.market_id)
)
result = False
else:
logging.error("Unable to retrieve open positions!")
result = False
except Exception:
logging.error("Error during close all positions")
result = False
return result
[docs] def get_account_used_perc(self) -> Optional[float]:
"""
Fetch the percentage of available balance is currently used
- Returns the percentage of account used over total available amount
"""
balance, deposit = self.get_account_balances()
if balance is None or deposit is None:
return None
return Utils.percentage(deposit, balance)
[docs] def navigate_market_node(self, node_id: str) -> Dict[str, Any]:
"""
Navigate the market node id
- Returns the json representing the market node
"""
url = "{}/{}/{}".format(self.api_base_url, IG_API_URL.MARKET_NAV.value, node_id)
return self._http_get(url)
def _get_watchlist(self, id: str) -> Dict[str, Any]:
"""
Get the watchlist info
- **id**: id of the watchlist. If empty id is provided, the
function returns the list of all the watchlist in the account
"""
url = "{}/{}/{}".format(self.api_base_url, IG_API_URL.WATCHLISTS.value, id)
return self._http_get(url)
[docs] def get_markets_from_watchlist(self, name: str) -> List[Market]:
"""
Get the list of markets included in the watchlist
- **name**: name of the watchlist
"""
markets = []
# Request with empty name returns list of all the watchlists
all_watchlists = self._get_watchlist("")
for w in all_watchlists["watchlists"]:
if "name" in w and w["name"] == name:
data = self._get_watchlist(w["id"])
if "markets" in data:
for m in data["markets"]:
markets.append(self.get_market_info(m["epic"]))
break
return markets
def _http_get(self, url: str) -> Dict[str, Any]:
"""
Perform an HTTP GET request to the url.
Return the json object returned from the API if 200 is received
Return None if an error is received from the API
"""
response = requests.get(url, headers=self.authenticated_headers)
if response.status_code != 200:
logging.error("HTTP request returned {}".format(response.status_code))
raise RuntimeError("HTTP request returned {}".format(response.status_code))
data = json.loads(response.text)
if "errorCode" in data:
logging.error(data["errorCode"])
raise RuntimeError(data["errorCode"])
return data
def get_macd(
self, market: Market, interval: Interval, data_range: int
) -> MarketMACD:
data = self._macd_dataframe(market, interval)
# TODO Put date instead of index numbers
return MarketMACD(
market,
data.index,
data["MACD"].values,
data["Signal"].values,
data["Hist"].values,
)
def _macd_dataframe(self, market: Market, interval: Interval) -> pandas.DataFrame:
prices = self.get_prices(market, Interval.DAY, 26)
if prices is None:
return None
return Utils.macd_df_from_list(
prices.dataframe[MarketHistory.CLOSE_COLUMN].values
)