Skip to content

Instantly share code, notes, and snippets.

@sumitsk20
Created June 18, 2024 21:45
Show Gist options
  • Save sumitsk20/4a7e9cf667852590e44f6275c06caaa0 to your computer and use it in GitHub Desktop.
Save sumitsk20/4a7e9cf667852590e44f6275c06caaa0 to your computer and use it in GitHub Desktop.
Python code that mimic the zerodha websocket behaviour and on_tick() functionality of KiteTicker, convert the tick to candles of different timeframe dynamically and very efficiently using in memory queue, after complete formation of one candle appending to a HDF Store which is efficient for financial data r/w
from collections import defaultdict, deque
import pandas as pd
import datetime
import queue
from threading import Thread
import time
import random
# Data structure to store tick data for each instrument
candles = defaultdict(lambda: defaultdict(lambda: list))
tick_data = defaultdict(lambda: deque(maxlen=2500)) # Limiting to last 1000 ticks per instrument
TickEventQ: queue.Queue = queue.Queue(maxsize=60000)
CandleEventQ: queue.Queue = queue.Queue(maxsize=60000)
token_dict = {256265:{'Symbol':'NIFTY 50'},260105:{'Symbol':'NIFTY BANK'}}
hdf5_store_path = 'candles_store.h5'
hdf5_store_path_exist = False
timeframes = ["1Min", "5Min", "15Min"]
class Event(object):
"""
Event is base class providing an interface for all subsequent
(inherited) events, that will trigger further events in the
trading infrastructure.
"""
pass
class TickEvent(Event):
"""
Handles the event of receiving a new market ticks
"""
def __init__(self,ticks):
"""
Initialises the MarketEvent.
"""
self.type = 'TICK'
self.data = ticks
class CandleEvent(Event):
"""
Handles the event of receiving a 1min candle
"""
def __init__(self, symbol,candle, timeframe):
self.type = 'CANDLE'
self.timeframe: str = timeframe
self.symbol: str = symbol
self.data: dict = candle
def print_self(self):
print ("CANDLE:",self.data)
def build_candle(last_candle, instrument):
return {
"token": str(instrument),
"Time": last_candle[instrument].name,
"open": last_candle[instrument]['last_price']['open'],
"high": last_candle[instrument]['last_price']['high'],
"low": last_candle[instrument]['last_price']['low'],
"close": last_candle[instrument]['last_price']['close'],
"volume": last_candle[instrument]['volume']['volume'],
"oi": last_candle[instrument]['oi']['oi'],
'atp': last_candle[instrument]['average_price']['average_price']
}
# Function to process tick data and generate candlesticks
def tick_processor():
last_candle = defaultdict(dict)
while True:
try:
event = TickEventQ.get(False)
except queue.Empty:
continue
else:
for tick in event.data:
instrument = tick["instrument_token"]
ltt = pd.Timestamp(tick["timestamp"])
# Append tick data to the list for the instrument
tick_data[instrument].append({
'timestamp': ltt,
'last_price': tick["last_price"],
'volume': tick["volume"] if tick["tradable"] else 0,
'oi': tick["oi"] if tick["tradable"] else 0,
'average_price': tick["average_price"] if tick["tradable"] else 0,
'tradable': tick["tradable"]
})
# Convert list to DataFrame for processing
df = pd.DataFrame(tick_data[instrument])
df.set_index('timestamp', inplace=True)
for timeframe in timeframes:
candles[instrument][timeframe] = resample_df_into_candle(df, timeframe)
update_last_completed_candle(last_candle, instrument, timeframe)
def update_last_completed_candle(last_candle, instrument, timeframe):
if not candles[instrument][timeframe].empty:
if len(candles[instrument][timeframe]) >= 2:
candle = None
if len(last_candle[instrument]):
if candles[instrument][timeframe].iloc[-2].name != last_candle[instrument].name:
last_candle[instrument] = candles[instrument][timeframe].iloc[-2].copy()
candle = build_candle(last_candle, instrument)
else:
last_candle[instrument] = candles[instrument][timeframe].iloc[-2].copy()
candle = build_candle(last_candle, instrument)
if candle:
candleevent = CandleEvent(instrument, candle, timeframe)
CandleEventQ.put(candleevent)
def resample_df_into_candle(df:pd.DataFrame, timeframe):
return df.resample(timeframe).agg({
'last_price': 'ohlc',
'volume': 'max',
'oi': 'max',
'average_price': 'avg'
}).dropna()
def candle_processor():
try:
with pd.HDFStore(hdf5_store_path, mode='a') as store:
while True:
try:
event: CandleEvent = CandleEventQ.get(False)
except queue.Empty:
continue
else:
event.data.update(token_dict[event.symbol])
df = pd.DataFrame(event.data, index=[event.data["Time"]])
store_key = f'inst_{event.data["token"]}_{event.timeframe}'
if store_key in store:
store.append(store_key, df, format='table', data_columns=True)
else:
store.put(store_key, df, format='table', data_columns=True)
except Exception as e:
print("failed to process candle with exception: ", e, e.with_traceback())
# Function to simulate receiving ticks
# Here this is mimicing on_tick()
def simulate_ticks():
ticks= []
for inst in token_dict.keys():
ticks.append(generate_tick_data(inst))
tick = TickEvent(ticks)
TickEventQ.put(tick)
# Function to continuously process ticks
# Here this is mimicing zerodha
def tick_generator():
while True:
simulate_ticks() # Simulate receiving ticks
time.sleep(1) # Simulate tick frequency
def generate_tick_data(instrument_token):
timestamp = datetime.datetime.now()
data = {
'instrument_token': instrument_token,
'mode': 'full',
'volume': random.randint(10000, 20000),
'last_price': round(random.uniform(4000, 4100), 2),
'average_price': round(random.uniform(4000, 4100), 2),
'last_quantity': random.randint(1, 100),
'buy_quantity': random.randint(2000, 5000),
'sell_quantity': random.randint(2000, 5000),
'change': round(random.uniform(-1, 1), 10),
'last_trade_time': timestamp - datetime.timedelta(seconds=random.randint(1, 60)),
'timestamp': timestamp,
'oi': random.randint(20000, 30000),
'oi_day_low': random.randint(0, 5000),
'oi_day_high': random.randint(0, 5000),
'ohlc': {
'high': round(random.uniform(4050, 4100), 2),
'close': round(random.uniform(4000, 4050), 2),
'open': round(random.uniform(4000, 4050), 2),
'low': round(random.uniform(4000, 4050), 2)
},
'tradable': random.choice([True, False]),
'depth': {
'sell': [
{'price': round(random.uniform(4000, 4050), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)},
{'price': round(random.uniform(4050, 4100), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)}
],
'buy': [
{'price': round(random.uniform(3950, 4000), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)},
{'price': round(random.uniform(3900, 3950), 2), 'orders': random.randint(1000, 5000), 'quantity': random.randint(10, 50)}
]
}
}
return data
def candle_reader():
last_read_index = defaultdict(None)
for token in token_dict.keys():
for timeframe in timeframes:
last_read_index.setdefault(f"inst_{token}_{timeframe}", None)
try:
with pd.HDFStore(hdf5_store_path, mode='a') as hdf5_store:
while True:
# Check if the dataset exists in the HDF5 file
for store_key in last_read_index.keys():
# If this is the first read operation, read the last row of the data
if store_key in hdf5_store:
try:
if last_read_index[store_key] is None:
# Read the last row of the data
data = hdf5_store[store_key].tail(1)
if not data.empty:
last_read_index[store_key] = data.index[-1]
print("Initial read, last index updated to:", last_read_index[store_key])
else:
# Read data after the last read index
data = hdf5_store.select(store_key, where=f'index > "{last_read_index[store_key]}"')
if not data.empty:
last_read_index[store_key] = data.index[-1]
print("New data read, last index updated to:", last_read_index[store_key])
else:
continue
# Process the DataFrame
df = pd.DataFrame(data)
print(f"\n\nNew candle for key {store_key}:\n", df)
except Exception as e:
print(f"Failed to read or process data for {store_key} with exception:", e)
except Exception as e:
print("failed to read candle with exception: ", e, e.with_traceback())
# Main function
def main():
# Start tick genrator thread
tick_thread = Thread(target=tick_generator)
tick_thread.start()
# Start tick processor thread
tick_processor_trhead = Thread(target=tick_processor)
tick_processor_trhead.start()
# Start candle processor thread
candle_processor_trhead = Thread(target=candle_processor)
candle_processor_trhead.start()
# Start candle reader thread
candle_reader_trhead = Thread(target=candle_reader)
candle_reader_trhead.start()
if __name__ == "__main__":
main()
@sumitsk20
Copy link
Author

You need to install pandas and pytable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment