Created
June 18, 2024 21:45
-
-
Save sumitsk20/4a7e9cf667852590e44f6275c06caaa0 to your computer and use it in GitHub Desktop.
Python code that mimic the zerodha websocket behaviour and on_tick() functionality of KiteTicker, convert the tick to candles of different timeframe dynamically and very efficiently using in memory queue, after complete formation of one candle appending to a HDF Store which is efficient for financial data r/w
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict, deque | |
import pandas as pd | |
import datetime | |
import queue | |
from threading import Thread | |
import time | |
import random | |
# Data structure to store tick data for each instrument | |
candles = defaultdict(lambda: defaultdict(lambda: list)) | |
tick_data = defaultdict(lambda: deque(maxlen=2500)) # Limiting to last 1000 ticks per instrument | |
TickEventQ: queue.Queue = queue.Queue(maxsize=60000) | |
CandleEventQ: queue.Queue = queue.Queue(maxsize=60000) | |
token_dict = {256265:{'Symbol':'NIFTY 50'},260105:{'Symbol':'NIFTY BANK'}} | |
hdf5_store_path = 'candles_store.h5' | |
hdf5_store_path_exist = False | |
timeframes = ["1Min", "5Min", "15Min"] | |
class Event(object): | |
""" | |
Event is base class providing an interface for all subsequent | |
(inherited) events, that will trigger further events in the | |
trading infrastructure. | |
""" | |
pass | |
class TickEvent(Event): | |
""" | |
Handles the event of receiving a new market ticks | |
""" | |
def __init__(self,ticks): | |
""" | |
Initialises the MarketEvent. | |
""" | |
self.type = 'TICK' | |
self.data = ticks | |
class CandleEvent(Event): | |
""" | |
Handles the event of receiving a 1min candle | |
""" | |
def __init__(self, symbol,candle, timeframe): | |
self.type = 'CANDLE' | |
self.timeframe: str = timeframe | |
self.symbol: str = symbol | |
self.data: dict = candle | |
def print_self(self): | |
print ("CANDLE:",self.data) | |
def build_candle(last_candle, instrument): | |
return { | |
"token": str(instrument), | |
"Time": last_candle[instrument].name, | |
"open": last_candle[instrument]['last_price']['open'], | |
"high": last_candle[instrument]['last_price']['high'], | |
"low": last_candle[instrument]['last_price']['low'], | |
"close": last_candle[instrument]['last_price']['close'], | |
"volume": last_candle[instrument]['volume']['volume'], | |
"oi": last_candle[instrument]['oi']['oi'], | |
'atp': last_candle[instrument]['average_price']['average_price'] | |
} | |
# Function to process tick data and generate candlesticks | |
def tick_processor(): | |
last_candle = defaultdict(dict) | |
while True: | |
try: | |
event = TickEventQ.get(False) | |
except queue.Empty: | |
continue | |
else: | |
for tick in event.data: | |
instrument = tick["instrument_token"] | |
ltt = pd.Timestamp(tick["timestamp"]) | |
# Append tick data to the list for the instrument | |
tick_data[instrument].append({ | |
'timestamp': ltt, | |
'last_price': tick["last_price"], | |
'volume': tick["volume"] if tick["tradable"] else 0, | |
'oi': tick["oi"] if tick["tradable"] else 0, | |
'average_price': tick["average_price"] if tick["tradable"] else 0, | |
'tradable': tick["tradable"] | |
}) | |
# Convert list to DataFrame for processing | |
df = pd.DataFrame(tick_data[instrument]) | |
df.set_index('timestamp', inplace=True) | |
for timeframe in timeframes: | |
candles[instrument][timeframe] = resample_df_into_candle(df, timeframe) | |
update_last_completed_candle(last_candle, instrument, timeframe) | |
def update_last_completed_candle(last_candle, instrument, timeframe): | |
if not candles[instrument][timeframe].empty: | |
if len(candles[instrument][timeframe]) >= 2: | |
candle = None | |
if len(last_candle[instrument]): | |
if candles[instrument][timeframe].iloc[-2].name != last_candle[instrument].name: | |
last_candle[instrument] = candles[instrument][timeframe].iloc[-2].copy() | |
candle = build_candle(last_candle, instrument) | |
else: | |
last_candle[instrument] = candles[instrument][timeframe].iloc[-2].copy() | |
candle = build_candle(last_candle, instrument) | |
if candle: | |
candleevent = CandleEvent(instrument, candle, timeframe) | |
CandleEventQ.put(candleevent) | |
def resample_df_into_candle(df:pd.DataFrame, timeframe): | |
return df.resample(timeframe).agg({ | |
'last_price': 'ohlc', | |
'volume': 'max', | |
'oi': 'max', | |
'average_price': 'avg' | |
}).dropna() | |
def candle_processor(): | |
try: | |
with pd.HDFStore(hdf5_store_path, mode='a') as store: | |
while True: | |
try: | |
event: CandleEvent = CandleEventQ.get(False) | |
except queue.Empty: | |
continue | |
else: | |
event.data.update(token_dict[event.symbol]) | |
df = pd.DataFrame(event.data, index=[event.data["Time"]]) | |
store_key = f'inst_{event.data["token"]}_{event.timeframe}' | |
if store_key in store: | |
store.append(store_key, df, format='table', data_columns=True) | |
else: | |
store.put(store_key, df, format='table', data_columns=True) | |
except Exception as e: | |
print("failed to process candle with exception: ", e, e.with_traceback()) | |
# Function to simulate receiving ticks | |
# Here this is mimicing on_tick() | |
def simulate_ticks(): | |
ticks= [] | |
for inst in token_dict.keys(): | |
ticks.append(generate_tick_data(inst)) | |
tick = TickEvent(ticks) | |
TickEventQ.put(tick) | |
# Function to continuously process ticks | |
# Here this is mimicing zerodha | |
def tick_generator(): | |
while True: | |
simulate_ticks() # Simulate receiving ticks | |
time.sleep(1) # Simulate tick frequency | |
def generate_tick_data(instrument_token): | |
timestamp = datetime.datetime.now() | |
data = { | |
'instrument_token': instrument_token, | |
'mode': 'full', | |
'volume': random.randint(10000, 20000), | |
'last_price': round(random.uniform(4000, 4100), 2), | |
'average_price': round(random.uniform(4000, 4100), 2), | |
'last_quantity': random.randint(1, 100), | |
'buy_quantity': random.randint(2000, 5000), | |
'sell_quantity': random.randint(2000, 5000), | |
'change': round(random.uniform(-1, 1), 10), | |
'last_trade_time': timestamp - datetime.timedelta(seconds=random.randint(1, 60)), | |
'timestamp': timestamp, | |
'oi': random.randint(20000, 30000), | |
'oi_day_low': random.randint(0, 5000), | |
'oi_day_high': random.randint(0, 5000), | |
'ohlc': { | |
'high': round(random.uniform(4050, 4100), 2), | |
'close': round(random.uniform(4000, 4050), 2), | |
'open': round(random.uniform(4000, 4050), 2), | |
'low': round(random.uniform(4000, 4050), 2) | |
}, | |
'tradable': random.choice([True, False]), | |
'depth': { | |
'sell': [ | |
{'price': round(random.uniform(4000, 4050), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)}, | |
{'price': round(random.uniform(4050, 4100), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)} | |
], | |
'buy': [ | |
{'price': round(random.uniform(3950, 4000), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)}, | |
{'price': round(random.uniform(3900, 3950), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)} | |
] | |
} | |
} | |
return data | |
def candle_reader(): | |
last_read_index = defaultdict(None) | |
for token in token_dict.keys(): | |
for timeframe in timeframes: | |
last_read_index.setdefault(f"inst_{token}_{timeframe}", None) | |
try: | |
with pd.HDFStore(hdf5_store_path, mode='a') as hdf5_store: | |
while True: | |
# Check if the dataset exists in the HDF5 file | |
for store_key in last_read_index.keys(): | |
# If this is the first read operation, read the last row of the data | |
if store_key in hdf5_store: | |
try: | |
if last_read_index[store_key] is None: | |
# Read the last row of the data | |
data = hdf5_store[store_key].tail(1) | |
if not data.empty: | |
last_read_index[store_key] = data.index[-1] | |
print("Initial read, last index updated to:", last_read_index[store_key]) | |
else: | |
# Read data after the last read index | |
data = hdf5_store.select(store_key, where=f'index > "{last_read_index[store_key]}"') | |
if not data.empty: | |
last_read_index[store_key] = data.index[-1] | |
print("New data read, last index updated to:", last_read_index[store_key]) | |
else: | |
continue | |
# Process the DataFrame | |
df = pd.DataFrame(data) | |
print(f"\n\nNew candle for key {store_key}:\n", df) | |
except Exception as e: | |
print(f"Failed to read or process data for {store_key} with exception:", e) | |
except Exception as e: | |
print("failed to read candle with exception: ", e, e.with_traceback()) | |
# Main function | |
def main(): | |
# Start tick genrator thread | |
tick_thread = Thread(target=tick_generator) | |
tick_thread.start() | |
# Start tick processor thread | |
tick_processor_trhead = Thread(target=tick_processor) | |
tick_processor_trhead.start() | |
# Start candle processor thread | |
candle_processor_trhead = Thread(target=candle_processor) | |
candle_processor_trhead.start() | |
# Start candle reader thread | |
candle_reader_trhead = Thread(target=candle_reader) | |
candle_reader_trhead.start() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You need to install
pandas
andpytable