Created
October 28, 2021 11:09
-
-
Save rolangom/ca1815d063fe95cbc2530523f18e56c1 to your computer and use it in GitHub Desktop.
rx_fetch_ib_data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ibapi.wrapper import EWrapper | |
from ibapi import utils | |
from ibapi.client import EClient, Contract | |
from ibapi.common import TickerId, BarData, TagValueList | |
from ibapi.utils import iswrapper | |
import rx | |
import rx.operators as rx_op | |
from rx.subject import AsyncSubject, Subject, BehaviorSubject, ReplaySubject | |
from rx.core.observable import Observable | |
import time | |
from typing import List, Optional, Union, Dict, Tuple, Callable, Any | |
import datetime as dt | |
import psycopg2 | |
import numpy as np | |
import pandas as pd | |
import sys | |
import logging | |
def make_contract(symbol: str, sec_type: str = "STK", currency: str = 'USD', exchange: str = 'SMART', | |
primaryExchange: str = "ISLAND", localsymbol: str = None) -> Contract: | |
contract = Contract() | |
contract.symbol = symbol | |
contract.secType = sec_type | |
contract.currency = currency | |
contract.exchange = exchange | |
contract.primaryExchange = primaryExchange | |
if localsymbol: | |
contract.localSymbol = localsymbol | |
return contract | |
class AlphaClientWrapper(EClient, EWrapper): | |
def __init__(self): | |
EClient.__init__(self, wrapper=self) | |
EWrapper.__init__(self) | |
self.request_id = 0 | |
self.started = False | |
self.next_valid_order_id = None | |
self.requests: Dict[int, Contract] = {} | |
self._subjects: Dict[int, Subject[(Contract, BarData)]] = {} | |
self.connected: BehaviorSubject[bool] = BehaviorSubject(False) | |
def next_request_id(self, contract: Contract) -> int: | |
self.request_id += 1 | |
self.requests[self.request_id] = contract | |
return self.request_id | |
def historicalDataRequest(self, contract: Contract, endDateTime: str, | |
durationStr: str, barSizeSetting: str, whatToShow: str = "TRADES", | |
useRTH: int = 0, formatDate: int = 1, keepUpToDate: bool = False) -> Observable: | |
print('hist from contract', contract) | |
print('historicalDataRequest %s, %s, %s, %s, %s, %s, %s' % ( | |
endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, keepUpToDate)) | |
cid = self.next_request_id( | |
contract) # , endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, keepUpToDate, chartOptions) | |
subject = Subject() | |
self._subjects[cid] = subject | |
self.reqHistoricalData( | |
cid, # tickerId, used to identify incoming data | |
contract, | |
endDateTime, # always go to midnight | |
durationStr, # amount of time to go back | |
barSizeSetting, # bar size | |
whatToShow, # historical data type | |
useRTH, # useRTH (regular trading hours) | |
formatDate, # format the date in yyyyMMdd HH:mm:ss | |
keepUpToDate, # keep up to date after snapshot | |
[], # chart options | |
) | |
# return subject | |
return self._subjects[cid] | |
@iswrapper | |
def historicalData(self, reqId: int, bar: BarData) -> None: | |
logging.info('historicalData %s, %s' % (reqId, bar)) | |
# print('historicalData %s, %s' % (reqId, bar)) | |
contract = self.requests[reqId] | |
subject = self._subjects[reqId] | |
if contract and subject: | |
subject.on_next((contract, bar)) | |
@iswrapper | |
def historicalDataEnd(self, reqId: int, start: str, end: str) -> None: | |
super().historicalDataEnd(reqId, start, end) | |
logging.info('historicalDataEnd %s, %s, %s' % (reqId, start, end)) | |
print('historicalDataEnd %s, %s, %s' % (reqId, start, end)) | |
subject = self._subjects[reqId] | |
subject.on_completed() | |
def do_connect(self, host: str = "127.0.0.1", port: int = 4001, clientId: int = 0) -> rx.Observable: | |
self.connect(host, port, clientId) | |
return self.connected | |
@iswrapper | |
def connectAck(self): | |
print('conected 0') | |
logging.info("Connected") | |
self.connected.on_next(True) | |
@iswrapper | |
def connectionClosed(self): | |
logging.info("Disconnected") | |
self.connected.on_next(False) | |
self.connected.on_completed() | |
# @iswrapper | |
# def nextValidId(self, order_id: int): | |
# super().nextValidId(order_id) | |
# # self.connected.on_next(True) | |
@iswrapper | |
def error(self, req_id: TickerId, error_code: int, error: str): | |
super().error(req_id, error_code, error) | |
print('error', error) | |
err = Exception(f"Error. Id: {req_id} Code {error_code} Msg: {error}") | |
if req_id < 0: | |
logging.debug("Error. Id: %s Code %s Msg: %s", req_id, error_code, error) | |
# self.connected.on_error(err) | |
else: | |
logging.error("Error. Id: %s Code %s Msg: %s", req_id, error_code, error) | |
subject = self._subjects[req_id] | |
if (subject is not None): | |
subject.on_error(err) | |
def say_bye(self): | |
print('bye!') | |
self.disconnect() | |
def make_ib_contract_req(client: AlphaClientWrapper, endDateTime: str, | |
durationStr: str, barSizeSetting: str, whatToShow: str = "TRADES", | |
useRTH: int = 0, formatDate: int = 1, keepUpToDate: bool = False): | |
def handle_conn_symbol(con_symbol: Tuple[bool, str]) -> Observable: | |
_is_connected, symbol = con_symbol | |
print('handle_conn_symbol', _is_connected, symbol) | |
contract = make_contract(symbol) | |
# now = dt.date.today() | |
return client.historicalDataRequest( | |
contract, | |
endDateTime, | |
durationStr, | |
barSizeSetting, | |
whatToShow, | |
useRTH, | |
formatDate, | |
keepUpToDate | |
# endDateTime=now.strftime('%Y%m%d 00:00:00'), | |
# endDateTime="", | |
# durationStr='1 M', | |
# barSizeSetting='1 day', | |
) | |
return handle_conn_symbol | |
def make_ib_contract_req_plain(client: AlphaClientWrapper, symbol: str, endDateTime: str, | |
durationStr: str, barSizeSetting: str, whatToShow: str = "TRADES", | |
useRTH: int = 0, formatDate: int = 1, keepUpToDate: bool = False) -> Observable: | |
print('make_ib_contract_req_plain', symbol) | |
contract = make_contract(symbol) | |
# now = dt.date.today() | |
return client.historicalDataRequest( | |
contract, | |
endDateTime, | |
durationStr, | |
barSizeSetting, | |
whatToShow, | |
useRTH, | |
formatDate, | |
keepUpToDate | |
# endDateTime=now.strftime('%Y%m%d 00:00:00'), | |
# endDateTime="", | |
# durationStr='1 M', | |
# barSizeSetting='1 day', | |
) | |
def prepare_insert_ticker(conn: psycopg2._psycopg.connection): | |
def insert_symbol(symbol: str): | |
cursor = conn.cursor() | |
try: | |
print('insert_symbol', symbol) | |
# delete data if exists to keep fresh data and clean workflow | |
cursor.execute( | |
f'DELETE FROM tickers WHERE symbol = %s', | |
(symbol,) | |
) | |
cursor.execute( | |
f'INSERT INTO tickers (symbol) VALUES (%s)', | |
(symbol,) | |
) | |
conn.commit() | |
# return rx.return_value((con, date)) | |
# return conn, symbol | |
except Exception as ex: | |
conn.rollback() | |
raise ex | |
# return rx.throw(ex) | |
finally: | |
cursor.close() | |
return insert_symbol | |
def handle_ib_resp(table_name: str, conn: psycopg2._psycopg.connection): | |
def handle_ib_response(con_data: Tuple[Contract, BarData]) -> Any: # Observable: | |
cursor = conn.cursor() | |
try: | |
print('handle_ib_response', con_data) | |
con, data = con_data | |
date = dt.datetime.strptime(data.date, "%Y%m%d" if len(data.date) == 8 else "%Y%m%d %H:%M:%S") | |
# delete data if exists to keep fresh data and clean workflow | |
cursor.execute( | |
f'DELETE FROM {table_name} WHERE date = %s AND symbol = %s', | |
(date, con.symbol) | |
) | |
cursor.execute( | |
f'INSERT INTO {table_name} (date, symbol, exchange, open, high, low, close, barCount, volume, average) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', | |
(date, con.symbol, con.exchange, data.open, data.high, data.low, data.close, data.barCount, data.volume, | |
data.average) | |
) | |
conn.commit() | |
# return rx.return_value((con, date)) | |
return con, date | |
except Exception as ex: | |
conn.rollback() | |
raise ex | |
# return rx.throw(ex) | |
finally: | |
cursor.close() | |
return handle_ib_response | |
def handle_conn_status(is_connected) -> Observable: | |
if is_connected: | |
return rx.just(True) | |
else: | |
return rx.throw(Exception("Client's not connected")) | |
def read_symbols_file(observer: rx.core.Observer, scheduler=None) -> rx.core.Observer: | |
with open('symbols.txt', 'r') as f: | |
lines = f.readlines() | |
for line in lines: | |
observer.on_next(line) | |
observer.on_completed() | |
return observer | |
def read_symbols_arr() -> List[str]: | |
with open('symbols.txt', 'r') as f: | |
lines = f.readlines() | |
return [line.replace('\n', '') for line in lines] | |
def run_v1(): | |
def on_final_results(con_data: Tuple[Contract, bool]): | |
con, data = con_data | |
print('on_final_results', con.symbol, type(data)) | |
print(con, data) | |
client = AlphaClientWrapper() | |
connection = client.do_connect(clientId=5) | |
symbols = rx.of('AAPL', 'TSLA', 'AMZN', 'MSFT', 'ABNB') # rx.create(read_symbols_file) # | |
conn = psycopg2.connect('dbname=alpha user=postgres') | |
# from psycopg2.pool import ThreadedConnectionPool | |
def init_daily_reqs(): | |
connection \ | |
.pipe( | |
rx_op.flat_map(handle_conn_status), | |
rx_op.combine_latest(symbols), | |
rx_op.flat_map(make_ib_contract_req(client, endDateTime="", durationStr="10 Y", barSizeSetting="1 day")), | |
rx_op.merge(max_concurrent=2), | |
rx_op.flat_map(handle_ib_resp('ib_bardata_1day', conn)), | |
# rx_op.combine_latest(conn), | |
) \ | |
.subscribe( | |
on_next=on_final_results, # lambda s: print(s), | |
on_error=lambda s: print('err', s), | |
# on_completed=lambda: client.say_bye() | |
) | |
init_daily_reqs() | |
client.run() | |
def init_daily_reqs(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection, symbols: List[str], duration: int): | |
def run_flow(_: Any): | |
def build_ibreq(symbol: str): | |
def run(_): | |
return make_ib_contract_req_plain(client, symbol, endDateTime="", durationStr=f"{duration} D", | |
barSizeSetting="1 day") \ | |
.pipe(rx_op.catch(rx.empty())) | |
return run | |
obs_list = [ | |
rx.defer(build_ibreq(symbol)) | |
for symbol in symbols | |
] | |
return rx.from_iterable(obs_list).pipe( | |
rx_op.merge(max_concurrent=8), | |
rx_op.flat_map(handle_ib_resp('ib_bardata_1day', conn)), | |
) | |
return client.do_connect(clientId=1).pipe( | |
rx_op.flat_map(handle_conn_status), | |
rx_op.flat_map(run_flow), | |
) | |
def get_all_1daybar_bd_symbols(conn: psycopg2._psycopg.connection) -> List[str]: | |
print('get_all_1daybar_bd_symbols') | |
sql = "SELECT DISTINCT symbol FROM ib_bardata_1day order by 1" | |
df = pd.read_sql(sql, conn) | |
return df['symbol'].tolist() | |
def get_symbol_all_days(conn: psycopg2._psycopg.connection, symbol: str, dst_table: str, do_orderby1_asc=True) -> List[ | |
dt.datetime]: # Tuple[str, List[dt.date]]: | |
print('get_symbol_all_days', symbol) | |
sort_dir1 = 'asc' if do_orderby1_asc else 'desc' | |
sql = f"""SELECT DISTINCT a.date | |
FROM ib_bardata_1day a left join | |
{dst_table} b on a.symbol = b.symbol and a.date = date_trunc('day', b.date) | |
WHERE a.symbol = %(symbol)s | |
and b.symbol is null | |
order by 1 {sort_dir1}""" | |
df = pd.read_sql(sql, conn, params=dict(symbol=symbol)) | |
return df['date'].tolist() | |
def fetch_symbol_xbar(client: AlphaClientWrapper, barSizeSetting) -> Callable[[str, dt.datetime], rx.Observable]: | |
def handle_symbol(symbol: str, date: dt.datetime) -> rx.Observable: | |
print('handle_symbol', symbol, date) | |
date_str = date.strftime("%Y%m%d 23:59:59") | |
return make_ib_contract_req_plain(client, symbol, endDateTime=date_str, durationStr="1 D", | |
barSizeSetting=barSizeSetting) \ | |
.pipe(rx_op.catch(rx.empty())) | |
return handle_symbol | |
def run_xbar_process(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection, table_name: str, | |
barSizeSetting: str, do_orderby1_asc=True): | |
symbols = get_all_1daybar_bd_symbols(conn) | |
handle_symbol = fetch_symbol_xbar(client, barSizeSetting) | |
def run_flow(_: Any): | |
def lazy_handle_symbol_dates(symbol, date): | |
def run(_): | |
return handle_symbol(symbol, date) | |
return run | |
def req_symbol_dates(symbol: str): | |
def run(_): | |
dates = get_symbol_all_days(conn, symbol, table_name, do_orderby1_asc) | |
obs = [rx.defer(lazy_handle_symbol_dates(symbol, date)) for date in dates] | |
return rx.from_iterable(obs) \ | |
.pipe( | |
rx_op.merge(max_concurrent=8), | |
rx_op.map(handle_ib_resp(table_name, conn)), | |
) | |
return run | |
obs = [rx.defer(req_symbol_dates(symbol)) for symbol in symbols] | |
return rx.from_iterable(obs) \ | |
.pipe( | |
rx_op.merge(max_concurrent=4), | |
# rx_op.subscribe_on(thread_pool_scheduler), | |
) | |
return client.do_connect(clientId=1).pipe( | |
rx_op.flat_map(handle_conn_status), | |
rx_op.flat_map(run_flow), | |
# rx_op.do_action(lambda _: do) | |
) | |
def run_1hour_process(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection): | |
return run_xbar_process(client, conn, "ib_bardata_1hour", "1 hour") | |
def run_5min_process(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection): | |
return run_xbar_process(client, conn, "ib_bardata_5min", "5 mins", False) | |
def get_db_conn(): | |
# conn = psycopg2.connect('dbname=alpha user=postgres') | |
conn = psycopg2.connect('dbname=deust user=postgres password=Rumpelstiltskin1485') | |
return conn | |
def main(mode: str, durationInDays: int): | |
print('Inicio') | |
client = AlphaClientWrapper() # rx.create(read_symbols_file) # ['ITE','ABNB', 'AAPL'] # | |
conn = get_db_conn() | |
def on_success(res): | |
print('on success', type(res), res) | |
def on_error(err): | |
print('on error', type(err), err) | |
raise err | |
def on_complete(): | |
print('on complete') | |
# def get_db_bardaily_symbols() -> rx.Observable: | |
# symbols = get_all_1daybar_bd_symbols(conn) | |
# return rx.just(symbols) | |
def get_initial_process(): | |
if mode == 'd': | |
all_symbols = read_symbols_arr() | |
return init_daily_reqs(client, conn, all_symbols, durationInDays) | |
elif mode == 'h': | |
return run_1hour_process(client, conn) | |
elif mode == '5m': | |
return run_5min_process(client, conn) | |
else: | |
return rx.throw(Exception("Invalid input")) | |
get_initial_process() \ | |
.subscribe( | |
on_next=on_success, | |
on_error=on_error, | |
on_completed=on_complete, | |
) | |
print('Lets run') | |
# client.do_connect(clientId=1) | |
client.run() | |
# symbols.subscribe( | |
# on_next=lambda s: print(s) | |
# ) | |
# rx.combine_latest() | |
# client.connected.pipe( | |
# rx. | |
# ) | |
def get_input_mode() -> Tuple[str, int]: | |
return (sys.argv[1] if len(sys.argv) > 1 else 'd', int(sys.argv[2]) if len(sys.argv) > 2 else 1) | |
retry_limit = 60 | |
def run_main(retry_i=0): | |
try: | |
mode, duration = get_input_mode() | |
main(mode, duration) | |
except Exception as err: | |
print(err) | |
print('Trying again', retry_i) | |
if retry_i <= retry_limit: | |
time.sleep(retry_i * 2) | |
run_main(retry_i + 1) | |
def run_insert_tickers(): | |
all_symbols = read_symbols_arr() | |
conn = get_db_conn() | |
insert_ticker = prepare_insert_ticker(conn) | |
for symbol in all_symbols: | |
insert_ticker(symbol) | |
if __name__ == "__main__": | |
run_main() | |
# run_insert_tickers() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment