Skip to content

Instantly share code, notes, and snippets.

@MooneDrJune
Last active September 18, 2024 14:13
Show Gist options
  • Save MooneDrJune/7e41ad89a310fc6d122bb1e012953549 to your computer and use it in GitHub Desktop.
Save MooneDrJune/7e41ad89a310fc6d122bb1e012953549 to your computer and use it in GitHub Desktop.
KiteConnect Extras Async KiteConnect Ticker
from __future__ import annotations
from asyncio.exceptions import CancelledError
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
NoReturn,
Tuple,
)
import os, sys, six, struct, contextlib, json, queue, asyncio, logging, websockets, websockets.client, signal, urllib.parse
from time import sleep
from pathlib import Path
from threading import Thread
from datetime import datetime
from websockets.typing import Data
from websockets.exceptions import (
WebSocketException,
ConnectionClosed,
ConnectionClosedOK,
ConnectionClosedError,
)
from websockets.client import WebSocketClientProtocol
from concurrent.futures import (
ProcessPoolExecutor,
ThreadPoolExecutor,
)
from multiprocessing import cpu_count
from logging.handlers import RotatingFileHandler
from datetime import datetime as dtdt
@staticmethod
def is_windows() -> bool:
return (
os.name == "nt" and sys.platform == "win32" and platform.system() == "Windows"
)
if not is_windows() and sys.version_info >= (3, 8):
try:
import uvloop
except (ImportError, ModuleNotFoundError):
os.system(f"{sys.executable} -m pip install uvloop")
import uvloop
from signal import SIGABRT, SIGINT, SIGTERM, SIGHUP
SIGNALS = (SIGABRT, SIGINT, SIGTERM, SIGHUP)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
else:
from signal import SIGABRT, SIGINT, SIGTERM
SIGNALS = (SIGABRT, SIGINT, SIGTERM)
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
from kiteconnect.__version__ import __version__, __title__
__all__ = ["KiteExtTicker"]
LOGGING_FORMAT: str = "[%(levelname)s]|[%(asctime)s]|[%(name)s::%(module)s::%(funcName)s::%(lineno)d]|=> %(message)s"
class AttrDict(dict):
__slots__ = ()
def __getattr__(self, attr):
try:
return self[attr]
except KeyError:
raise AttributeError(attr) from None
def __setattr__(self, attr, value):
self[attr] = value
def __delattr__(self, attr):
try:
del self[attr]
except KeyError:
raise AttributeError(attr) from None
def __dir__(self):
return list(self) + dir(type(self))
class Callback:
def __init__(
self,
callback: Callable,
loop: Optional[asyncio.AbstractEventLoop] = None,
pool: Optional[Literal["thread", "process"]] = None,
max_workers: int = cpu_count(),
) -> None:
self.callback = callback
self.is_async = asyncio.iscoroutinefunction(callback)
self.loop = loop if loop is not None else asyncio.get_event_loop()
self.pool = (
ThreadPoolExecutor(max_workers=max_workers)
if isinstance(pool, str) and pool == "thread"
else ProcessPoolExecutor(max_workers=max_workers)
if isinstance(pool, str) and pool == "process"
else None
)
def __shutdown_pool(self) -> None:
if self.pool is not None:
self.pool.shutdown(wait=False, cancel_futures=True)
def __del__(self) -> None:
self.__shutdown_pool()
def __delete__(self) -> None:
self.__shutdown_pool()
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.__shutdown_pool()
def __aexit__(self) -> None:
self.__shutdown_pool()
async def __call__(self, obj):
if self.callback is None:
return
elif self.is_async:
await self.callback(obj)
else:
await self.loop.run_in_executor(self.pool, self.callback, obj)
class LoggerAdapter(logging.LoggerAdapter):
"""Add connection ID and client IP address to websockets logs."""
def process(self, msg, kwargs):
try:
websocket = kwargs["extra"]["websocket"]
except KeyError:
return msg, kwargs
kwargs["extra"]["event_data"] = {
"connection_id": str(websocket.id),
"remote_addr": websocket.request_headers.get("X-Forwarded-For"),
}
return msg, kwargs
@staticmethod
def get_logger(name, filename, level=logging.WARNING) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(level)
stream = logging.StreamHandler()
stream.setFormatter(logging.Formatter(LOGGING_FORMAT))
logger.addHandler(stream)
fh = RotatingFileHandler(filename, maxBytes=100 * 1024 * 1024, backupCount=25)
fh.setFormatter(logging.Formatter(LOGGING_FORMAT))
logger.addHandler(fh)
logger.propagate = False
return logger
@staticmethod
def get_now_date_time_with_microseconds_string() -> str:
return dtdt.now().strftime("%d_%b_%Y_%H_%M_%S_%f")
log = get_logger(
__name__,
Path(os.getcwd()).joinpath(
f"logs/kiteext_ticker_{get_now_date_time_with_microseconds_string()}.log"
),
)
class KiteExtTicker:
PING_INTERVAL: float = 2.5
KEEPALIVE_INTERVAL: int = 5
MAXDELAY: int = 5
MAXRETRIES: int = 10
# Default connection timeout
CONNECT_TIMEOUT: int = 30
# Default Reconnect max delay.
RECONNECT_MAX_DELAY: int = 60
# Default reconnect attempts
RECONNECT_MAX_TRIES: int = 50
CLOSE_TIMEOUT: int = 30
# Default root API endpoint. It's possible to
# override this by passing the `root` parameter during initialisation.
KITE_API_WS_ROOT_URI: str = "wss://ws.kite.trade"
KITE_WEB_WS_ROOT_URI: str = "wss://ws.zerodha.com"
KITE_WEB_WS20_ROOT_URI: str = "wss://ws20.zerodha.com"
KITE_MOBILE_WS_ROOT_URI: str = "wss://ws-mobile.zerodha.com"
KITE_WEB_API_KEY: str = "kitefront"
KITE_ANDROID_API_KEY: str = "kiteandroid"
KITE_IOS_API_KEY: str = "kiteios"
# Available streaming modes.
MODE_FULL: str = "full"
MODE_QUOTE: str = "quote"
MODE_LTP: str = "ltp"
SUBSCRIBE: str = "SUBSCRIBE"
UNSUBSCRIBE: str = "UNSUBSCRIBE"
SETMODE: str = "SETMODE"
TICK_UPDATE: str = "TICK_UPDATE"
ORDER_UPDATE: str = "ORDER_UPDATE"
ERROR_UPDATE: str = "ERROR_UPDATE"
MESSAGE_UPDATE: str = "MESSAGE_UPDATE"
ACCESS_TOKEN_UPDATE: str = "ACCESS_TOKEN_UPDATE"
SYNTH_FUT_ATM_UPDATE: str = "SYNTH_FUT_ATM_UPDATE"
EXIT_CALL: str = "EXIT_CALL"
ASK_SUBSCRIPTION: str = "ASK_SUBSCRIPTION"
# Flag to set if its first connect
IS_FIRST_CONNECT = True
# Available actions.
MESSAGE_CODE: int = 11
MESSAGE_SUBSCRIBE: str = "subscribe"
MESSAGE_UNSUBSCRIBE: str = "unsubscribe"
MESSAGE_SETMODE: str = "mode"
# Minimum delay which should be set between retries. User can't set less than this
MINIMUM_RECONNECT_MAX_DELAY: int = 5
# Maximum number or retries user can set
MAXIMUM_RECONNECT_MAX_TRIES: int = 300
CUSTOM_HEADERS: Dict[str, str] = {
"X-Kite-Version": "3", # For version 3
}
PING_PAYLOAD: str = ""
ON_TICKS: str = "ON_TICKS"
ON_EXTENDED_TICKS: str = "ON_EXTENDED_TICKS"
ON_ORDER_UPDATES: str = "ON_ORDER_UPDATES"
EXCHANGE_MAP: Dict[str, int] = {
"nse": 1,
"nfo": 2,
"cds": 3,
"bse": 4,
"bfo": 5,
"bcd": 6,
"mcx": 7,
"mcxsx": 8,
"indices": 9,
# bsecds is replaced with it's official segment name bcd
# so,bsecds key will be depreciated in next version
"bsecds": 6,
}
BROWSER_USER_AGENT: str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.2210.91"
@staticmethod
def _user_agent() -> str:
return (__title__ + "-python/").capitalize() + __version__
@staticmethod
def default_callbacks() -> Dict[str, Optional[Callable]]:
return dict.fromkeys(
(
KiteExtTicker.ON_TICKS.lower(),
KiteExtTicker.ON_ORDER_UPDATES.lower(),
)
)
@staticmethod
def parse_text_message(
payload: Data,
) -> None | Tuple[str, str] | Tuple[str, Dict[str, Any]]:
"""Parse text message."""
# Decode unicode data
if not six.PY2 and type(payload) == bytes:
payload = payload.decode("utf-8")
try:
data = json.loads(payload)
except ValueError:
return
# Order update callback
if data.get("type") is not None and data["type"] == "order":
if data.get("data") is not None:
return KiteExtTicker.ORDER_UPDATE, data["data"]
return KiteExtTicker.ORDER_UPDATE, data
# Custom error with websocket error code 0
if data.get("type") is not None and data["type"] == "error":
if data.get("data") is not None:
return KiteExtTicker.ERROR_UPDATE, data["data"]
return KiteExtTicker.ERROR_UPDATE, data
if data.get("type") is not None and data["type"] == "message":
if data.get("data") is not None:
return KiteExtTicker.MESSAGE_UPDATE, data["data"]
return KiteExtTicker.MESSAGE_UPDATE, data
if data.get("type") is not None and data["type"] == "instruments_meta":
if data.get("data") is not None:
return KiteExtTicker.MESSAGE_UPDATE, data["data"]
return KiteExtTicker.MESSAGE_UPDATE, data
@staticmethod
def unpack_int(bin: bytes, start: int, end: int, byte_format: str = "I") -> int:
"""Unpack binary data as unsgined interger."""
return struct.unpack(">" + byte_format, bin[start:end])[0]
@staticmethod
def split_packets(bin: bytes) -> None | List[bytes]:
"""Split the data to individual packets of ticks."""
# Ignore heartbeat data. (len(bin) < 2)
if len(bin) >= 2:
j = 2
packets = []
number_of_packets = KiteExtTicker.unpack_int(bin, 0, 2, byte_format="H")
for i in range(number_of_packets):
packet_length = KiteExtTicker.unpack_int(bin, j, j + 2, byte_format="H")
packets.append(bin[j + 2 : j + 2 + packet_length])
j = j + 2 + packet_length
return packets
@staticmethod
def parse_binary_packet(packet: bytes) -> Dict[str, Any]:
"""Parse binary data ticks structure."""
tick_data = {}
instrument_token = KiteExtTicker.unpack_int(packet, 0, 4)
# Retrive segment constant from instrument_token
segment = instrument_token & 0xFF
# Add price divisor based on segment
if segment == KiteExtTicker.EXCHANGE_MAP["cds"]:
divisor = 10000000.0
elif segment == KiteExtTicker.EXCHANGE_MAP["bcd"]:
divisor = 10000.0
else:
divisor = 100.0
# All indices are not tradable
tradable = not (segment == KiteExtTicker.EXCHANGE_MAP["indices"])
# LTP packets
if len(packet) == 8:
tick_data.update(
{
"tradable": tradable,
"mode": KiteExtTicker.MODE_LTP,
"instrument_token": instrument_token,
"last_price": KiteExtTicker.unpack_int(packet, 4, 8) / divisor,
}
)
# Indices quote and full mode
if len(packet) == 28 or len(packet) == 32:
mode = (
KiteExtTicker.MODE_QUOTE
if len(packet) == 28
else KiteExtTicker.MODE_FULL
)
tick_data.update(
{
"tradable": tradable,
"mode": mode,
"instrument_token": instrument_token,
"last_price": KiteExtTicker.unpack_int(packet, 4, 8) / divisor,
"ohlc": {
"high": KiteExtTicker.unpack_int(packet, 8, 12) / divisor,
"low": KiteExtTicker.unpack_int(packet, 12, 16) / divisor,
"open": KiteExtTicker.unpack_int(packet, 16, 20) / divisor,
"close": KiteExtTicker.unpack_int(packet, 20, 24) / divisor,
},
}
)
# Compute the change price using close price and last price
tick_data["change"] = 0
if tick_data["ohlc"]["close"] != 0:
tick_data["change"] = (
(tick_data["last_price"] - tick_data["ohlc"]["close"])
* 100
/ tick_data["ohlc"]["close"]
)
# Full mode with timestamp
if len(packet) == 32:
try:
timestamp = datetime.fromtimestamp(
KiteExtTicker.unpack_int(packet, 28, 32)
)
except Exception:
timestamp = None
tick_data["exchange_timestamp"] = timestamp
# Quote and full mode
if len(packet) == 44 or len(packet) == 184:
mode = (
KiteExtTicker.MODE_QUOTE
if len(packet) == 44
else KiteExtTicker.MODE_FULL
)
tick_data.update(
{
"tradable": tradable,
"mode": mode,
"instrument_token": instrument_token,
"last_price": KiteExtTicker.unpack_int(packet, 4, 8) / divisor,
"last_traded_quantity": KiteExtTicker.unpack_int(packet, 8, 12),
"average_traded_price": KiteExtTicker.unpack_int(packet, 12, 16)
/ divisor,
"volume_traded": KiteExtTicker.unpack_int(packet, 16, 20),
"total_buy_quantity": KiteExtTicker.unpack_int(packet, 20, 24),
"total_sell_quantity": KiteExtTicker.unpack_int(packet, 24, 28),
"ohlc": {
"open": KiteExtTicker.unpack_int(packet, 28, 32) / divisor,
"high": KiteExtTicker.unpack_int(packet, 32, 36) / divisor,
"low": KiteExtTicker.unpack_int(packet, 36, 40) / divisor,
"close": KiteExtTicker.unpack_int(packet, 40, 44) / divisor,
},
}
)
# Compute the change price using close price and last price
tick_data["change"] = 0
if tick_data["ohlc"]["close"] != 0:
tick_data["change"] = (
(tick_data["last_price"] - tick_data["ohlc"]["close"])
* 100
/ tick_data["ohlc"]["close"]
)
# Parse full mode
if len(packet) == 184:
try:
last_trade_time = datetime.fromtimestamp(
KiteExtTicker.unpack_int(packet, 44, 48)
)
except Exception:
last_trade_time = None
try:
timestamp = datetime.fromtimestamp(
KiteExtTicker.unpack_int(packet, 60, 64)
)
except Exception:
timestamp = None
tick_data["last_trade_time"] = last_trade_time
tick_data["oi"] = KiteExtTicker.unpack_int(packet, 48, 52)
tick_data["oi_day_high"] = KiteExtTicker.unpack_int(packet, 52, 56)
tick_data["oi_day_low"] = KiteExtTicker.unpack_int(packet, 56, 60)
tick_data["exchange_timestamp"] = timestamp
# Market depth entries.
depth = {"buy": [], "sell": []}
# Compile the market depth lists.
for i, p in enumerate(range(64, len(packet), 12)):
depth["sell" if i >= 5 else "buy"].append(
{
"quantity": KiteExtTicker.unpack_int(packet, p, p + 4),
"price": KiteExtTicker.unpack_int(packet, p + 4, p + 8)
/ divisor,
"orders": KiteExtTicker.unpack_int(
packet, p + 8, p + 10, byte_format="H"
),
}
)
tick_data["depth"] = depth
# Parse full mode with extended (20-depth) ticker
if len(packet) == 492:
tick_data.update(
{
"tradable": tradable,
"mode": KiteExtTicker.MODE_FULL,
"instrument_token": instrument_token,
"last_price": KiteExtTicker.unpack_int(packet, 4, 8) / divisor,
}
)
try:
timestamp = datetime.fromtimestamp(
KiteExtTicker.unpack_int(packet, 8, 12)
)
except Exception:
timestamp = None
tick_data["exchange_timestamp"] = timestamp
# Market 20 depth entries.
extended_depth = {"buy": [], "sell": []}
# Compile the market depth lists.
for i, p in enumerate(range(12, len(packet), 12)):
extended_depth["sell" if i >= 20 else "buy"].append(
{
"quantity": KiteExtTicker.unpack_int(packet, p, p + 4),
"price": KiteExtTicker.unpack_int(packet, p + 4, p + 8)
/ divisor,
"orders": KiteExtTicker.unpack_int(packet, p + 8, p + 12),
}
)
tick_data["20_depth"] = extended_depth
return tick_data
@staticmethod
def parse_binary(bin: bytes) -> None | List[Dict[str, Any]]:
"""Parse binary data to a (list of) ticks structure."""
# Ignore heartbeat data.
if len(bin) < 2:
return
else:
# split data to individual ticks packet
packets = KiteExtTicker.split_packets(bin)
if packets is not None:
return [KiteExtTicker.parse_binary_packet(packet) for packet in packets]
@staticmethod
def _start_background_loop(loop: asyncio.AbstractEventLoop) -> Optional[NoReturn]:
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
loop.run_until_complete(loop.shutdown_asyncgens())
if loop.is_running():
loop.stop()
if not loop.is_closed():
loop.close()
def __aenter__(self) -> "KiteExtTicker":
return self
def __enter__(self) -> "KiteExtTicker":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self._graceful_exit()
def __del__(self) -> None:
self._graceful_exit()
def __delete__(self) -> None:
self._graceful_exit()
def create_ticker_feed_url(self) -> None:
self.ticker_feed_url = (
(
"{root}?api_key={api_key}"
"&user_id={user_id}"
"&access_token={access_token}".format(
root=self._root,
api_key=self.api_key,
user_id=self.user_id,
access_token=urllib.parse.quote(self.access_token),
)
)
if self.api_key
in [
self.KITE_WEB_API_KEY,
self.KITE_ANDROID_API_KEY,
self.KITE_IOS_API_KEY,
]
else (
"{root}?api_key={api_key}" "&access_token={access_token}".format(
root=self._root,
api_key=self.api_key,
access_token=self.access_token,
)
)
)
log.info("Feed Url Created as: %s", self.ticker_feed_url)
def refresh_token_from_file(self) -> None:
token_data = None
with open(self.token_path, "r") as file:
token_data = json.load(file)
if token_data is not None:
self.access_token = token_data["access_token"]
self.create_ticker_feed_url()
def __init__(
self,
user_id: str = "",
api_key: str = KITE_WEB_API_KEY,
access_token: str = "",
root: Optional[str] = None,
debug: bool = False,
debug_verbose: bool = False,
log_tick_updates: bool = False,
log_order_updates: bool = True,
callbacks: Optional[Dict[str, Callback | Callable]] = None,
reconnect: bool = True,
reconnect_max_tries: int = RECONNECT_MAX_TRIES,
reconnect_max_delay: int = RECONNECT_MAX_DELAY,
connect_timeout: int = CONNECT_TIMEOUT,
with_extended_ticker: bool = False,
token_path: Path = Path(os.getcwd()).joinpath("access_token.json"),
) -> None:
if user_id == "":
raise ValueError(
"`user_id` value can not be empty, Please Try again with correct value of `user_id`."
)
self.user_id = user_id
self.api_key = api_key
if isinstance(access_token, str) and access_token == "":
token_data = None
with open(token_path, "r") as file:
token_data = json.load(file)
if token_data is not None:
self.access_token = token_data["access_token"]
if self.access_token is not None and isinstance(self.access_token, str):
pass
else:
raise ValueError(
"`access_token` value can not be empty, Please Try again with correct value of `access_token` or ensure the token file specified in the token path conatins valid `access_token`."
)
else:
self.access_token = access_token
self.debug = debug
self.debug_verbose = debug_verbose
self.log_tick_updates = log_tick_updates
self.log_order_updates = log_order_updates
self.token_path = token_path
self.log_level = (
logging.INFO
if self.debug
else logging.DEBUG
if self.debug_verbose
else logging.WARNING
)
log.setLevel(self.log_level)
logging.getLogger("websockets").addHandler(logging.NullHandler())
if self.debug:
logging.basicConfig(format=LOGGING_FORMAT, level=self.log_level)
logging.getLogger("websockets").propagate = False
logging.getLogger("websockets").setLevel(logging.WARNING)
if self.debug_verbose:
log.propagate = self.debug_verbose
logging.basicConfig(format=LOGGING_FORMAT, level=self.log_level)
logging.getLogger("websockets").propagate = self.debug_verbose
logging.getLogger("websockets").setLevel(self.log_level)
self.callbacks = callbacks
self.on_tick = None
self.on_extended_tick = None
self.on_order_update = None
self.ticker_feed = None
self.ticker_feed_running = False
self.called_for_exit = False
self.should_run = True
self._root = root or self.KITE_API_WS_ROOT_URI
self.create_ticker_feed_url()
self.tickdata = {}
self.subscribed_tokens = {}
self.order_updates_data = {}
self._ticker_feed_first_run = True
self._fresh_access_token_event = False
self.stop_stream_queue = queue.Queue()
self.__initialize_loop()
if self.callbacks is not None and isinstance(self.callbacks, dict):
for k, v in self.callbacks.items():
if k.lower() == KiteExtTicker.ON_TICKS.lower():
self.on_tick = (
Callback(v, loop=self._loop) if isinstance(v, Callable) else v
)
if k.lower() == KiteExtTicker.ON_ORDER_UPDATES.lower():
self.on_order_update = (
Callback(v, loop=self._loop) if isinstance(v, Callable) else v
)
self.run()
def _graceful_exit(self) -> None:
with contextlib.suppress(RuntimeError, RuntimeWarning):
self.stop()
asyncio.run_coroutine_threadsafe(
self._loop.shutdown_asyncgens(), self._loop
).result(15.0)
if self._loop.is_running():
self._loop.stop()
if not self._loop.is_closed():
self._loop.close()
def _handle_stop_signals(self, *args, **kwargs):
try:
self._graceful_exit()
except Exception as err:
log.error(str(err))
else:
exit()
def __initialize_loop(self) -> None:
if is_windows():
self._loop = asyncio.new_event_loop()
with contextlib.suppress(ValueError):
for sig in SIGNALS:
signal.signal(sig, self._handle_stop_signals)
else:
self._loop = uvloop.new_event_loop()
with contextlib.suppress(ValueError):
for sig in SIGNALS:
self._loop.add_signal_handler(sig, self._handle_stop_signals) # noqa E501
self._event_thread = Thread(
target=self._start_background_loop,
args=(self._loop,),
name=f"{self.__class__.__name__}_event_thread",
daemon=True,
)
self._event_thread.start()
log.info("KiteExtTicker Event Loop has been initialized.")
async def get_ws_client(self) -> WebSocketClientProtocol:
uri = self.ticker_feed_url
return await websockets.client.connect(
uri,
logger=LoggerAdapter(
get_logger("websockets.client", "websocket.log", level=self.log_level),
None,
)
if self.debug or self.debug_verbose
else None,
user_agent_header=self._user_agent(),
extra_headers=self.CUSTOM_HEADERS,
open_timeout=self.CONNECT_TIMEOUT,
ping_interval=self.PING_INTERVAL,
ping_timeout=self.KEEPALIVE_INTERVAL,
close_timeout=self.CLOSE_TIMEOUT,
)
async def __on_open(self) -> None:
if self.ticker_feed is not None and self.ticker_feed_running:
instruments = {
"NIFTY 50": 256265,
"NIFTY BANK": 260105,
"NIFTY FIN SERVICE": 257801,
"NIFTY MID SELECT": 288009,
"BANKEX": 274441,
"SENSEX": 265,
}
log.info("Sending On Open Packet To Price Feed.")
await self.__subscribe(instruments)
log.info("On Open Packet Sent To Price Feed Successfully")
def run(self) -> None:
try:
if self._loop.is_running():
self.run_future = asyncio.run_coroutine_threadsafe(
self.run_forever(), self._loop
)
except KeyboardInterrupt:
log.info("Keyboard Interrupt Detected, Bye...")
def stop(self) -> None:
if self._loop.is_running():
asyncio.run_coroutine_threadsafe(
self.stop_ws(),
self._loop,
).result(7.5)
if self.run_future.running():
self.run_future.cancel()
while not self.run_future.done():
sleep(0.025)
async def stop_ws(self) -> None:
self.should_run = False
if self.stop_stream_queue.empty():
self.stop_stream_queue.put_nowait({"should_stop": True})
await asyncio.sleep(1.0)
await self.close()
async def close(self, with_exceptions: bool = False) -> None:
log_msg = "Facing An Error, " if with_exceptions else ""
log_msg += "Ticker Feed Websocket Disconnected With Endpoint: %s"
if self.ticker_feed is not None:
log.info("Initiating Websocket Closing Procedures.")
if self.ticker_feed_running and self.ticker_feed.open:
unsubscribe_frame = json.dumps(
{
"a": KiteExtTicker.MESSAGE_UNSUBSCRIBE,
"v": list(self.subscribed_tokens.keys()),
}
)
await self.ticker_feed.send(unsubscribe_frame)
await asyncio.sleep(0.25)
if not self.ticker_feed.closed:
log.info("Calling websocket.close() task.")
try:
async with asyncio.timeout(2.5):
await self.ticker_feed.close()
# await self.ticker_feed.wait_closed()
except TimeoutError:
log.critical(
"Websocket Closing Task Timed Out After Waiting For 2.5 Seconds"
)
else:
log.info("Calling websocket.close() completed succesfully.")
self.ticker_feed_running, self.ticker_feed = False, None
log.info(log_msg, self.ticker_feed_url)
async def connect(self) -> None:
log.info("Connecting To Websocket Endpoint: %s", self.ticker_feed_url)
self.ticker_feed = await self.get_ws_client()
await asyncio.sleep(0.01)
if self.ticker_feed is not None and isinstance(
self.ticker_feed, WebSocketClientProtocol
):
self.ticker_feed_running = True
log.info(
"Websocket Endpoint: %s Connected Successfully.", self.ticker_feed_url
)
await asyncio.sleep(0.1)
if self._ticker_feed_first_run:
self._ticker_feed_first_run = False
await self.__on_open()
else:
if len(self.subscribed_tokens) > 0:
await self.__resubscribe()
await asyncio.sleep(0.25)
async def run_forever(self) -> None:
await asyncio.sleep(0.1)
reconnect_sleep_time: float = 0.5
while True:
if (
not self.stop_stream_queue.empty()
or not self.should_run
or self.called_for_exit
):
# self.stop_stream_queue.get(timeout=1.0)
log.info("Ticker Feeds Stopped Successfully.")
return
if not self.ticker_feed_running:
log.info("Initializing Or Restarting Ticker Feed Websocket Stream.")
await self.connect()
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(self.is_ticker_feed_runnig())
tg.create_task(self.consume())
tg.create_task(self.ping())
except* WebSocketException as wse:
log.warning(
"Ticker Feeds Websocket Has Met With An Exception, During Its Communication With Zerodha Kite Servers, Exception Was: %s",
wse,
)
log.exception(wse, exc_info=True)
await self.close(with_exceptions=True)
await asyncio.sleep(reconnect_sleep_time)
reconnect_sleep_time += 0.05
if not self._fresh_access_token_event:
self.refresh_token_from_file()
else:
self._fresh_access_token_event = False
except* Exception as exc:
log.critical(
"An Exception Has Occured While Consuming The Stream From Ticker Feed. Please Check For Correction In The Callbacks If Any., Exception Was: %s",
exc,
)
log.exception(exc, exc_info=True)
await asyncio.sleep(0.001)
async def is_ticker_feed_runnig(self) -> None:
await asyncio.sleep(0.1)
while True:
if (
not self.stop_stream_queue.empty()
or not self.should_run
or self.called_for_exit
):
# self.stop_stream_queue.get(timeout=1.0)
log.info(
"Ticker Feeds Process `is_ticker_feed_runnig` Task Stopped Successfully."
)
return
try:
if self.ticker_feed is not None:
await self.ticker_feed.ensure_open()
except ConnectionClosed:
self.ticker_feed_running = False
raise WebSocketException(
"Websocket Connection Was Found To Be Closed, hence Can't Ping Pong."
)
await asyncio.sleep(0.25)
async def ping(self) -> None:
await asyncio.sleep(0.3)
while True:
if (
not self.stop_stream_queue.empty()
or not self.should_run
or self.called_for_exit
):
# self.stop_stream_queue.get(timeout=1.0)
log.info("Ticker Feeds PINPONG Task Stopped Successfully.")
return
if not self.ticker_feed_running:
log.info(
"Ticker Feeds Not Running, As A Result, PINPONG Task Stopped Successfully."
)
return
try:
async with asyncio.timeout(KiteExtTicker.PING_INTERVAL):
await asyncio.sleep(KiteExtTicker.PING_INTERVAL + 1.0)
except TimeoutError:
if self.ticker_feed is not None and self.ticker_feed.open:
try:
async with asyncio.timeout(KiteExtTicker.KEEPALIVE_INTERVAL):
try:
# await self.ticker_feed.ping(data=self.PING_PAYLOAD)
pong_waiter = await self.ticker_feed.ping(
data=self.PING_PAYLOAD
)
log.info("Ticker Feed Pinged.")
# A future that will be completed when the corresponding pong is received. You can ignore it if you don’t intend to wait. The result of the future is the latency of the connection in seconds.
# Here we do not want to wait for the corresponding pong packet so commenting the below line.
latency = await pong_waiter
log.info(
"Ticker Feed Pong Received With Latency: %s",
str(latency),
)
except ConnectionClosed:
self.ticker_feed_running = False
raise WebSocketException(
"Websocket Connection Was Found To Be Closed, hence Can't Ping Pong."
)
except (RuntimeError, RuntimeWarning):
continue
except CancelledError:
continue
except Exception:
raise WebSocketException(
"While Sending PING and waiting for PONG from `ping` function Exception Occured"
)
except TimeoutError:
self.ticker_feed_running = False
raise WebSocketException(
"Ticker Feed Websocket Connection Stale As Pong is not received within the keep alive time interval of 5 seconds."
)
except (RuntimeError, RuntimeWarning):
continue
except CancelledError:
continue
except Exception as err:
log.exception(
"While Sending PING and waiting for PONG from `ping` function, Exception Occured: %s",
err,
)
raise WebSocketException(
"While Sending PING and waiting for PONG from `ping` function Exception Occured"
)
await asyncio.sleep(0.001)
async def consume(self) -> None:
await asyncio.sleep(0.2)
while True:
data = None
if (
not self.stop_stream_queue.empty()
or not self.should_run
or self.called_for_exit
):
# self.stop_stream_queue.get(timeout=1.0)
log.info("Ticker Feeds Consume Task Stopped Successfully.")
return
if not self.ticker_feed_running:
log.info(
"Ticker Feeds Not Running, As A Result, Consume Task Stopped Successfully."
)
return
try:
async with asyncio.timeout(1.0):
if self.ticker_feed is not None:
try:
data = await self.ticker_feed.recv()
except (
ConnectionClosed,
ConnectionClosedOK,
ConnectionClosedError,
):
self.ticker_feed_running = False
raise WebSocketException(
"Websocket Connection Was Found To Be Closed, hence Can't Receive And Consume Any Data."
)
except (RuntimeError, RuntimeWarning):
continue
except TimeoutError:
continue
except (RuntimeError, RuntimeWarning):
continue
else:
try:
if data is not None:
await self.dispatch(data)
except TimeoutError:
continue
except Exception as exc:
log.critical(
"While Dispatching from `consume` function, Exception Occured: %s",
)
log.exception(exc, exc_info=True)
await asyncio.sleep(0.001)
async def cast(
self,
to: str,
data: Data | Dict[str, Any] | List[AttrDict] | List[Dict[str, Any]],
):
match to.upper():
case KiteExtTicker.ON_TICKS:
log.info("Casting on `on_ticks` callback.")
if self.on_tick is not None and isinstance(self.on_tick, Callback):
try:
await self.on_tick(data)
except Exception as exc:
log.critical(
"While Executing `on_ticks` Callback Function, Exception Occured: %s",
)
log.exception(exc, exc_info=True)
case KiteExtTicker.ON_ORDER_UPDATES:
log.info("Casting on `on_order_updates` callback.")
if self.on_order_update is not None and isinstance(
self.on_order_update, Callback
):
try:
await self.on_order_update(data)
except Exception as exc:
log.critical(
"While Executing `on_order_updates` Callback Function, Exception Occured: %s",
)
log.exception(exc, exc_info=True)
async def dispatch(
self,
data: Data,
) -> None:
if isinstance(data, bytes) and len(data) > 4:
try:
await self.process_price_frame(data)
except Exception as exc:
log.critical("An Exception Has Occured While Dispatching Tick Datas.")
log.exception(exc, exc_info=True)
else:
try:
await self.process_order_or_error_frame(data)
except Exception as exc:
log.critical(
"An Exception Has Occured While Dispatching Order Datas / Error Datas / Custom Message Datas."
)
log.exception(exc, exc_info=True)
async def process_order_or_error_frame(self, frame: Data) -> None:
_parsed_text_data = self.parse_text_message(frame)
if _parsed_text_data is not None and isinstance(_parsed_text_data, tuple):
_data_type, data = _parsed_text_data
if _data_type == KiteExtTicker.ORDER_UPDATE and isinstance(data, dict):
if self.on_order_update is None:
try:
await self.cast(KiteExtTicker.ON_ORDER_UPDATES, data)
except Exception as exc:
log.critical(
"An Exception Has Occured While Casting Order Updates Data."
)
log.exception(exc, exc_info=True)
if self.log_order_updates:
log.info("Order Update: %s", data)
if _data_type in {KiteExtTicker.ERROR_UPDATE, KiteExtTicker.MESSAGE_UPDATE}:
log.info(
"Error Message Receieved On Ticker Feed: %s"
if _data_type == KiteExtTicker.ERROR_UPDATE
else "Custom Message Receieved On Ticker Feed: %s",
data,
)
async def process_price_frame(self, frame: Data) -> None:
tick_datas = None
if isinstance(frame, bytes):
tick_datas = self.parse_binary(frame)
if tick_datas is not None:
if self.on_tick is not None:
await self.cast(KiteExtTicker.ON_TICKS, tick_datas)
if self.log_tick_updates:
log.info("Tick Update: %s", tick_datas)
async def __setmode(
self,
instruments: Dict[str, int],
mode: str = MODE_FULL,
) -> None:
log.info(
"Instruments: %s, Modes To Be Set As: %s", list(instruments.keys()), mode
)
subscription_mode_frame = json.dumps(
{"a": self.MESSAGE_SETMODE, "v": [mode, list(instruments.values())]}
)
await self._setmode(subscription_mode_frame)
[
self.subscribed_tokens[token].__setitem__("mode", mode)
for token in instruments.values()
]
[
self.tickdata[tradingsymbol].__setitem__("mode", mode)
for tradingsymbol, token in instruments.items()
if self.tickdata[tradingsymbol]["instrument_token"] == token
]
log.info("Instruments Mode Has Been Set To: %s Successfully.", mode)
async def _setmode(
self,
subscription_mode_frame: str | bytes,
) -> None:
for i in range(1, 6):
if not self.ticker_feed_running:
sleep_duration = i + i * 0.1
await asyncio.sleep(sleep_duration)
log.error(
"Ticker Feed Websocket Is Not Running, Sleeping For %s Second, Then Will Retry Setting Modes Of Subscribing Instruments. Retry No.: %s",
str(sleep_duration),
str(i),
)
if self.ticker_feed is not None and self.ticker_feed_running:
await self.ticker_feed.send(subscription_mode_frame)
log.info("Instruments Subscription Mode Set Packets Sent Successfully.")
else:
log.error(
"Ticker Feed Websocket Is Not Running, Hence Can't Set Mode Of Subscribed Instruments."
)
def setmode(
self,
instruments: Dict[str, int],
mode: str = MODE_FULL,
) -> None:
log.info(
"Instruments: %s, Modes To Be Set As: %s", list(instruments.keys()), mode
)
subscription_mode_frame = json.dumps(
{"a": self.MESSAGE_SETMODE, "v": [mode, list(instruments.values())]}
)
future = asyncio.run_coroutine_threadsafe(
self._setmode(subscription_mode_frame), self._loop
)
try:
future.result(timeout=5.0)
except TimeoutError:
log.error(
"The instruments setmode tasks took too long, cancelling the task..."
)
future.cancel()
except Exception as exc:
log.exception(
"The instruments setmode tasks has raised an exception: %s",
exc,
)
else:
[
self.subscribed_tokens[token].__setitem__("mode", mode)
for token in instruments.values()
]
[
self.tickdata[tradingsymbol].__setitem__("mode", mode)
for tradingsymbol, token in instruments.items()
if self.tickdata[tradingsymbol]["instrument_token"] == token
]
log.info("Instruments Mode Has Been Set To: %s Successfully.", mode)
async def __subscribe(
self,
instruments: Dict[str, int],
mode: str = MODE_FULL,
) -> None:
log.info(
"Instruments To Be Subscribed: %s, With Mode: %s",
list(instruments.keys()),
mode,
)
subscription_frame = json.dumps(
{"a": KiteExtTicker.MESSAGE_SUBSCRIBE, "v": list(instruments.values())}
)
subscription_mode_frame = json.dumps(
{
"a": KiteExtTicker.MESSAGE_SETMODE,
"v": [mode, list(instruments.values())],
}
)
await self._subscribe(subscription_frame, subscription_mode_frame)
self.subscribed_tokens.update(
{
token: {
"tradingsymbol": tradingsymbol,
"mode": mode,
"tick_data": None,
"extended_tick_data": None,
}
for tradingsymbol, token in instruments.items()
}
)
self.tickdata.update(
{
tradingsymbol: {
"tradingsymbol": tradingsymbol,
"mode": mode,
"instrument_token": token,
}
for tradingsymbol, token in instruments.items()
}
)
log.info("Instruments Subscribed Successfully In %s Mode.", mode)
async def _subscribe(
self,
subscription_frame: str | bytes,
subscription_mode_frame: str | bytes,
) -> None:
for i in range(1, 6):
if not self.ticker_feed_running:
sleep_duration = i + i * 0.1
await asyncio.sleep(sleep_duration)
log.error(
"Ticker Feed Websocket Is Not Running, Sleeping For %s Second, Then Will Retry Subscribing Instruments. Retry No.: %s",
str(sleep_duration),
str(i),
)
if self.ticker_feed is not None and self.ticker_feed_running:
await self.ticker_feed.send(subscription_frame)
await self.ticker_feed.send(subscription_mode_frame)
log.info("Instruments Subscription Packets Sent Successfully.")
else:
log.error("Ticker Feed Websocket Is Not Running, Hence Can't Subscribe.")
def subscribe(
self,
instruments: Dict[str, int],
mode: str = MODE_FULL,
) -> None:
log.info(
"Instruments To Be Subscribed: %s, With Mode: %s",
list(instruments.keys()),
mode,
)
subscription_frame = json.dumps(
{"a": KiteExtTicker.MESSAGE_SUBSCRIBE, "v": list(instruments.values())}
)
subscription_mode_frame = json.dumps(
{
"a": KiteExtTicker.MESSAGE_SETMODE,
"v": [mode, list(instruments.values())],
}
)
future = asyncio.run_coroutine_threadsafe(
self._subscribe(subscription_frame, subscription_mode_frame), self._loop
)
try:
future.result(timeout=5.0)
except TimeoutError:
log.error(
"The instruments subscription tasks took too long, cancelling the task..."
)
future.cancel()
except Exception as exc:
log.exception(
"The instruments subscription tasks has raised an exception: %s",
exc,
)
else:
self.subscribed_tokens.update(
{
token: {
"tradingsymbol": tradingsymbol,
"mode": mode,
"tick_data": None,
"extended_tick_data": None,
}
for tradingsymbol, token in instruments.items()
}
)
self.tickdata.update(
{
tradingsymbol: {
"tradingsymbol": tradingsymbol,
"mode": mode,
"instrument_token": token,
}
for tradingsymbol, token in instruments.items()
}
)
log.info("Instruments Subscribed Successfully In %s Mode.", mode)
async def __unsubscribe(self, instruments: Dict[str, int]) -> None:
log.info("Instruments To Be Unsubscribed: %s", list(instruments.keys()))
unsubscription_frame = json.dumps(
{
"a": KiteExtTicker.MESSAGE_UNSUBSCRIBE,
"v": list(instruments.values()),
}
)
await self._unsubscribe(unsubscription_frame)
[self.subscribed_tokens.pop(token) for token in instruments.values()]
[
self.tickdata.pop(tradingsymbol)
for tradingsymbol, token in instruments.items()
if self.tickdata[tradingsymbol]["instrument_token"] == token
]
log.info("Instruments Unsubscribed Successfully.")
async def _unsubscribe(
self,
unsubscription_frame: str | bytes,
) -> None:
for i in range(1, 6):
if not self.ticker_feed_running:
sleep_duration = i + i * 0.1
await asyncio.sleep(sleep_duration)
log.error(
"Ticker Feed Websocket Is Not Running, Sleeping For %s Second, Then Will Retry Unsubscribing Instruments. Retry No.: %s",
str(sleep_duration),
str(i),
)
if self.ticker_feed is not None and self.ticker_feed_running:
await self.ticker_feed.send(unsubscription_frame)
log.info("Instruments Unsubscription Packets Sent Successfully.")
return
else:
log.error("Ticker Feed Websocket Is Not Running, Hence Can't Unsubscribe.")
def unsubscribe(
self,
instruments: Dict[str, int],
) -> None:
log.info("Instruments To Be Unsubscribed: %s", list(instruments.keys()))
unsubscription_frame = json.dumps(
{
"a": KiteExtTicker.MESSAGE_UNSUBSCRIBE,
"v": list(instruments.values()),
}
)
future = asyncio.run_coroutine_threadsafe(
self._unsubscribe(unsubscription_frame), self._loop
)
try:
future.result(timeout=5.0)
except TimeoutError:
log.error(
"The instruments unsubscription tasks took too long, cancelling the task..."
)
future.cancel()
except Exception as exc:
log.exception(
"The instruments unsubscription tasks has raised an exception: %s",
exc,
)
else:
[self.subscribed_tokens.pop(token) for token in instruments.values()]
[
self.tickdata.pop(tradingsymbol)
for tradingsymbol, token in instruments.items()
if self.tickdata[tradingsymbol]["instrument_token"] == token
]
log.info("Instruments Unsubscribed Successfully.")
async def __resubscribe(self) -> None:
log.info(
"Instruments To Be Resubscribed: %s",
[data["tradingsymbol"] for data in self.subscribed_tokens.values()],
)
await self._resubscribe()
log.info("Instruments Resubscribed Successfully.")
async def _resubscribe(self) -> None:
_ltp_mode_tokens = [
token
for token, data in self.subscribed_tokens.items()
if data["mode"] == KiteExtTicker.MODE_LTP
]
_quote_mode_tokens = [
token
for token, data in self.subscribed_tokens.items()
if data["mode"] == KiteExtTicker.MODE_QUOTE
]
_full_mode_tokens = [
token
for token, data in self.subscribed_tokens.items()
if data["mode"] == KiteExtTicker.MODE_FULL
]
for i in range(1, 6):
if not self.ticker_feed_running:
sleep_duration = i + i * 0.1
await asyncio.sleep(sleep_duration)
log.error(
"Ticker Feed Websocket Is Not Running, Sleeping For %s Second, Then Will Retry Resubscribing Instruments. Retry No.: %s",
str(sleep_duration),
str(i),
)
if self.ticker_feed is not None and self.ticker_feed_running:
if len(_ltp_mode_tokens) > 0:
await self.ticker_feed.send(
json.dumps(
{
"a": KiteExtTicker.MESSAGE_SUBSCRIBE,
"v": _ltp_mode_tokens,
}
)
)
await self.ticker_feed.send(
json.dumps(
{
"a": KiteExtTicker.MESSAGE_SETMODE,
"v": [KiteExtTicker.MODE_LTP, _ltp_mode_tokens],
}
)
)
log.info(
"Instruments: %s, Resubscription Packets With Mode: %s Sent Successfully.",
[
self.subscribed_tokens[token]["tradingsymbol"]
for token in _ltp_mode_tokens
],
KiteExtTicker.MODE_LTP,
)
if len(_quote_mode_tokens) > 0:
await self.ticker_feed.send(
json.dumps(
{
"a": KiteExtTicker.MESSAGE_SUBSCRIBE,
"v": _quote_mode_tokens,
}
)
)
await self.ticker_feed.send(
json.dumps(
{
"a": KiteExtTicker.MESSAGE_SETMODE,
"v": [KiteExtTicker.MODE_QUOTE, _quote_mode_tokens],
}
)
)
log.info(
"Instruments: %s, Resubscription Packets With Mode: %s Sent Successfully.",
[
self.subscribed_tokens[token]["tradingsymbol"]
for token in _quote_mode_tokens
],
KiteExtTicker.MODE_QUOTE,
)
if len(_full_mode_tokens) > 0:
await self.ticker_feed.send(
json.dumps(
{
"a": KiteExtTicker.MESSAGE_SUBSCRIBE,
"v": _full_mode_tokens,
}
)
)
await self.ticker_feed.send(
json.dumps(
{
"a": KiteExtTicker.MESSAGE_SETMODE,
"v": [KiteExtTicker.MODE_FULL, _full_mode_tokens],
}
)
)
log.info(
"Instruments: %s, Resubscription Packets With Mode: %s Sent Successfully.",
[
self.subscribed_tokens[token]["tradingsymbol"]
for token in _full_mode_tokens
],
KiteExtTicker.MODE_FULL,
)
return
else:
log.error("Ticker Feed Websocket Is Not Running, Hence Can't Resubscribe.")
def resubscribe(self) -> None:
log.info(
"Instruments To Be Resubscribed: %s",
[data["tradingsymbol"] for data in self.subscribed_tokens.values()],
)
future = asyncio.run_coroutine_threadsafe(self._resubscribe(), self._loop)
try:
future.result(timeout=15.0)
except TimeoutError:
log.error(
"The instruments resubscription tasks took too long, cancelling the task..."
)
future.cancel()
except Exception as exc:
log.exception(
"The instruments resubscription tasks has raised an exception: %s",
exc,
)
else:
log.info("Instruments Resubscribed Successfully.")
import signal
from time import sleep
from async_kiteext_ticker import KiteExtTicker
# you can pass your `api_key` in KiteExtTicker if you have one.
# In that case `user_id is not needed.
user_id = "Paste_Your_User_Id_Here"
access_token = "Paste_Your_Access_token_Or_Enctoken_Here"
instruments = {
"BANKNIFTY2422144400PE": 11719682,
"BANKNIFTY2422144500CE": 11719938,
"BANKNIFTY2422144500PE": 11722242,
"BANKNIFTY2422144600CE": 11722498,
"BANKNIFTY2422144600PE": 11724290,
"BANKNIFTY2422144700CE": 11724546,
"BANKNIFTY2422144700PE": 11726338,
"BANKNIFTY2422144800CE": 11726594,
"BANKNIFTY2422144800PE": 11726850,
"BANKNIFTY2422144900CE": 11727106,
"BANKNIFTY2422144900PE": 11727362,
"BANKNIFTY2422145000CE": 11727618,
"BANKNIFTY2422145000PE": 11727874,
}
def on_ticks(data):
print("Ticks Data => ", data)
def on_order_updates(data):
print("Order Update Data => ", data)
ticker = KiteExtTicker(
user_id=user_id,
access_token=access_token,
callbacks={
"on_ticks": on_ticks,
"on_order_updates": on_order_updates,
},
# log_tick_updates=True,
# log_order_updates=True,
# debug=True, # Uncomment This For Debugging
# debug_verbose=True, # Uncomment This For Verbose Debugging
)
if __name__ == "__main__":
signal.signal(signal.SIGINT, ticker._handle_stop_signals)
signal.signal(signal.SIGTERM, ticker._handle_stop_signals)
sleep(0.1)
ticker.subscribe(instruments=instruments, mode=ticker.MODE_FULL)
while True:
try:
print("Dr. June Moone Says Hi! From Main Thread.")
except KeyboardInterrupt:
break
else:
sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment