Created
March 24, 2017 11:53
-
-
Save robcarver17/e1c4db39ed4675bab2b9e0f2b6ba4467 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Gist example of IB wrapper ... | |
# | |
# Download API from http://interactivebrokers.github.io/# | |
# | |
# Install python API code /IBJts/source/pythonclient $ python3 setup.py install | |
# | |
# Note: The test cases, and the documentation refer to a python package called IBApi, | |
# but the actual package is called ibapi. Go figure. | |
# | |
# Get the latest version of the gateway: | |
# https://www.interactivebrokers.com/en/?f=%2Fen%2Fcontrol%2Fsystemstandalone-ibGateway.php%3Fos%3Dunix | |
# (for unix: windows and mac users please find your own version) | |
# | |
# Run the gateway | |
# | |
# user: edemo | |
# pwd: demo123 | |
# | |
# Now I'll try and replicate the historical data example | |
from ibapi.wrapper import EWrapper | |
from ibapi.client import EClient | |
from threading import Thread | |
import queue | |
import time | |
""" | |
Next section is 'scaffolding' | |
""" | |
ACCOUNT_UPDATE_FLAG = "update" | |
ACCOUNT_VALUE_FLAG = "value" | |
ACCOUNT_TIME_FLAG = "time" | |
class identifed_as(object): | |
""" | |
Used to identify | |
""" | |
def __init__(self, label, data): | |
self.label = label | |
self.data = data | |
def __repr__(self): | |
return "Identified as %s" % self.label | |
class list_of_identified_items(list): | |
""" | |
A list of elements, each of class identified_as (or duck equivalent) | |
Used to seperate out accounting data | |
""" | |
def seperate_into_dict(self): | |
""" | |
:return: dict, keys are labels, each element is a list of items matching label | |
""" | |
all_labels = [element.label for element in self] | |
dict_data = dict([ | |
(label, | |
[element.data for element in self if element.label==label]) | |
for label in all_labels]) | |
return dict_data | |
## marker for when queue is finished | |
FINISHED = object() | |
STARTED = object() | |
TIME_OUT = object() | |
class finishableQueue(object): | |
""" | |
Creates a queue which will finish at some point | |
""" | |
def __init__(self, queue_to_finish): | |
self._queue = queue_to_finish | |
self.status = STARTED | |
def get(self, timeout): | |
""" | |
Returns a list of queue elements once timeout is finished, or a FINISHED flag is received in the queue | |
:param timeout: how long to wait before giving up | |
:return: list of queue elements | |
""" | |
contents_of_queue=[] | |
finished=False | |
while not finished: | |
try: | |
current_element = self._queue.get(timeout=timeout) | |
if current_element is FINISHED: | |
finished = True | |
self.status = FINISHED | |
else: | |
contents_of_queue.append(current_element) | |
## keep going and try and get more data | |
except queue.Empty: | |
## If we hit a time out it's most probable we're not getting a finished element any time soon | |
## give up and return what we have | |
finished = True | |
self.status = TIME_OUT | |
return contents_of_queue | |
def timed_out(self): | |
return self.status is TIME_OUT | |
## cache used for accounting data | |
class simpleCache(object): | |
""" | |
Cache is stored in _cache in nested dict, outer key is accountName, inner key is cache label | |
""" | |
def __init__(self, max_staleness_seconds): | |
self._cache = dict() | |
self._cache_updated_local_time = dict() | |
self._max_staleness_seconds = max_staleness_seconds | |
def __repr__(self): | |
return "Cache with labels"+",".join(self._cache.keys()) | |
def update_data(self, accountName): | |
raise Exception("You need to set this method in an inherited class") | |
def _get_last_updated_time(self, accountName, cache_label): | |
if accountName not in self._cache_updated_local_time.keys(): | |
return None | |
if cache_label not in self._cache_updated_local_time[accountName]: | |
return None | |
return self._cache_updated_local_time[accountName][cache_label] | |
def _set_time_of_updated_cache(self, accountName, cache_label): | |
# make sure we know when the cache was updated | |
if accountName not in self._cache_updated_local_time.keys(): | |
self._cache_updated_local_time[accountName]={} | |
self._cache_updated_local_time[accountName][cache_label] = time.time() | |
def _is_data_stale(self, accountName, cache_label, ): | |
""" | |
Check to see if the cached data has been updated recently for a given account and label, or if it's stale | |
:return: bool | |
""" | |
STALE = True | |
NOT_STALE = False | |
last_update = self._get_last_updated_time(accountName, cache_label) | |
if last_update is None: | |
## we haven't got any data, so by construction our data is stale | |
return STALE | |
time_now = time.time() | |
time_since_updated = time_now - last_update | |
if time_since_updated > self._max_staleness_seconds: | |
return STALE | |
else: | |
## recently updated | |
return NOT_STALE | |
def _check_cache_empty(self, accountName, cache_label): | |
""" | |
:param accountName: str | |
:param cache_label: str | |
:return: bool | |
""" | |
CACHE_EMPTY = True | |
CACHE_PRESENT = False | |
cache = self._cache | |
if accountName not in cache.keys(): | |
return CACHE_EMPTY | |
cache_this_account = cache[accountName] | |
if cache_label not in cache_this_account.keys(): | |
return CACHE_EMPTY | |
return CACHE_PRESENT | |
def _return_cache_values(self, accountName, cache_label): | |
""" | |
:param accountName: str | |
:param cache_label: str | |
:return: None or cache contents | |
""" | |
if self._check_cache_empty(accountName, cache_label): | |
return None | |
return self._cache[accountName][cache_label] | |
def _create_cache_element(self, accountName, cache_label): | |
cache = self._cache | |
if accountName not in cache.keys(): | |
cache[accountName] = {} | |
cache_this_account = cache[accountName] | |
if cache_label not in cache_this_account.keys(): | |
cache[accountName][cache_label] = None | |
def get_updated_cache(self, accountName, cache_label): | |
""" | |
Checks for stale cache, updates if needed, returns up to date value | |
:param accountName: str | |
:param cache_label: str | |
:return: updated part of cache | |
""" | |
if self._is_data_stale(accountName, cache_label) or self._check_cache_empty(accountName, cache_label): | |
self.update_data(accountName) | |
return self._return_cache_values(accountName, cache_label) | |
def update_cache(self, accountName, dict_with_data): | |
""" | |
:param accountName: str | |
:param dict_with_data: dict, which has keynames with cache labels | |
:return: nothing | |
""" | |
all_labels = dict_with_data.keys() | |
for cache_label in all_labels: | |
self._create_cache_element(accountName, cache_label) | |
self._cache[accountName][cache_label] = dict_with_data[cache_label] | |
self._set_time_of_updated_cache(accountName, cache_label) | |
""" | |
Now into the main bit of the code; Wrapper and Client objects | |
""" | |
class TestWrapper(EWrapper): | |
""" | |
The wrapper deals with the action coming back from the IB gateway or TWS instance | |
We override methods in EWrapper that will get called when this action happens, like currentTime | |
Extra methods are added as we need to store the results in this object | |
""" | |
def __init__(self): | |
## use a dict as could have different accountids | |
self._my_accounts = {} | |
## We set these up as we could get things coming along before we run an init | |
self._my_positions = queue.Queue() | |
self._my_errors = queue.Queue() | |
def get_error(self, timeout=5): | |
if self.is_error(): | |
try: | |
return self._my_errors.get(timeout=timeout) | |
except queue.Empty: | |
return None | |
return None | |
def is_error(self): | |
an_error_if=not self._my_errors.empty() | |
return an_error_if | |
def error(self, id, errorCode, errorString): | |
## Overriden method | |
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString) | |
self._my_errors.put(errormsg) | |
## get positions code | |
def init_positions(self): | |
positions_queue = self._my_positions = queue.Queue() | |
return positions_queue | |
def position(self, account, contract, position, | |
avgCost): | |
## uses a simple tuple, but you could do other, fancier, things here | |
position_object = (account, contract, position, | |
avgCost) | |
self._my_positions.put(position_object) | |
def positionEnd(self): | |
## overriden method | |
self._my_positions.put(FINISHED) | |
## get accounting data | |
def init_accounts(self, accountName): | |
accounting_queue = self._my_accounts[accountName] = queue.Queue() | |
return accounting_queue | |
def updateAccountValue(self, key:str, val:str, currency:str, | |
accountName:str): | |
## use this to seperate out different account data | |
data = identifed_as(ACCOUNT_VALUE_FLAG, (key,val, currency)) | |
self._my_accounts[accountName].put(data) | |
def updatePortfolio(self, contract, position:float, | |
marketPrice:float, marketValue:float, | |
averageCost:float, unrealizedPNL:float, | |
realizedPNL:float, accountName:str): | |
## use this to seperate out different account data | |
data = identifed_as(ACCOUNT_UPDATE_FLAG, (contract, position, marketPrice, marketValue, averageCost, | |
unrealizedPNL, realizedPNL)) | |
self._my_accounts[accountName].put(data) | |
def updateAccountTime(self, timeStamp:str): | |
## use this to seperate out different account data | |
data = identifed_as(ACCOUNT_TIME_FLAG, timeStamp) | |
self._my_accounts[accountName].put(data) | |
def accountDownloadEnd(self, accountName:str): | |
self._my_accounts[accountName].put(FINISHED) | |
class TestClient(EClient): | |
""" | |
The client method | |
We don't override native methods, but instead call them from our own wrappers | |
""" | |
def __init__(self, wrapper): | |
## Set up with a wrapper inside | |
EClient.__init__(self, wrapper) | |
## We use these to store accounting data | |
self._account_cache = simpleCache(max_staleness_seconds = 5*60) | |
## override function | |
self._account_cache.update_data = self._update_accounting_data | |
def get_current_positions(self): | |
""" | |
Current positions held | |
:return: | |
""" | |
## Make a place to store the data we're going to return | |
positions_queue = finishableQueue(self.init_positions()) | |
## ask for the data | |
self.reqPositions() | |
## poll until we get a termination or die of boredom | |
MAX_WAIT_SECONDS = 10 | |
positions_list = positions_queue.get(timeout=MAX_WAIT_SECONDS) | |
while self.wrapper.is_error(): | |
print(self.get_error()) | |
if positions_queue.timed_out(): | |
print("Exceeded maximum wait for wrapper to confirm finished whilst getting positions") | |
return positions_list | |
def _update_accounting_data(self, accountName): | |
""" | |
Update the accounting data in the cache | |
:param accountName: account we want to get data for | |
:return: nothing | |
""" | |
## Make a place to store the data we're going to return | |
accounting_queue = finishableQueue(self.init_accounts(accountName)) | |
## ask for the data | |
self.reqAccountUpdates(True, accountName) | |
## poll until we get a termination or die of boredom | |
MAX_WAIT_SECONDS = 10 | |
accounting_list = accounting_queue.get(timeout=MAX_WAIT_SECONDS) | |
while self.wrapper.is_error(): | |
print(self.get_error()) | |
if accounting_queue.timed_out(): | |
print("Exceeded maximum wait for wrapper to confirm finished whilst getting accounting data") | |
# seperate things out, because this is one big queue of data with different things in it | |
accounting_list = list_of_identified_items(accounting_list) | |
seperated_accounting_data = accounting_list.seperate_into_dict() | |
## update the cache with different elements | |
self._account_cache.update_cache(accountName, seperated_accounting_data) | |
## return nothing, information is accessed via get_... methods | |
def get_accounting_time_from_server(self, accountName): | |
""" | |
Get the accounting time from IB server | |
:return: accounting time as served up by IB | |
""" | |
#All these functions follow the same pattern: check if stale or missing, if not return cache, else update values | |
return self._account_cache.get_updated_cache(accountName, ACCOUNT_TIME_FLAG) | |
def get_accounting_values(self, accountName): | |
""" | |
Get the accounting values from IB server | |
:return: accounting values as served up by IB | |
""" | |
#All these functions follow the same pattern: check if stale, if not return cache, else update values | |
return self._account_cache.get_updated_cache(accountName, ACCOUNT_VALUE_FLAG) | |
def get_accounting_updates(self, accountName): | |
""" | |
Get the accounting updates from IB server | |
:return: accounting updates as served up by IB | |
""" | |
#All these functions follow the same pattern: check if stale, if not return cache, else update values | |
return self._account_cache.get_updated_cache(accountName, ACCOUNT_UPDATE_FLAG) | |
class TestApp(TestWrapper, TestClient): | |
def __init__(self, ipaddress, portid, clientid): | |
TestWrapper.__init__(self) | |
TestClient.__init__(self, wrapper=self) | |
self.connect(ipaddress, portid, clientid) | |
thread = Thread(target = self.run) | |
thread.start() | |
setattr(self, "_thread", thread) | |
if __name__ == '__main__': | |
app = TestApp("127.0.0.1", 4001, 1) | |
## lets get positions | |
positions_list = app.get_current_positions() | |
print(positions_list) | |
## get the account name from the position | |
## normally you would know your account name | |
accountName = positions_list[0][0] | |
## and accounting information | |
accounting_values = app.get_accounting_values(accountName) | |
print(accounting_values) | |
## these values are cached | |
## if we ask again in more than 5 minutes it will update everything | |
accounting_updates = app.get_accounting_updates(accountName) | |
print(accounting_updates) | |
app.disconnect() |
When I execute this code, I get an empty list for positions_list.
--- Update ---
Sorry. I figured out the error. I was testing on a paper trading account which have no positions. Hence the positions_list is empty.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There is a bug at line 348 (timeStamp should replace accountName).