Last active
November 10, 2023 15:39
-
-
Save MooneDrJune/7b4066b2a605fcb490a783f7b5304964 to your computer and use it in GitHub Desktop.
get_ce_pe_based_on_ltp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
:description: A Blazingly Fast Custom Class To Fetch Instruments Token, Symbols, Expiries, LTP Etc. from KiteConnect. | |
:license: MIT. | |
:author: Dr June Moone | |
:created: On Monday September 26, 2022 11:17:53 GMT+05:30 | |
""" | |
__author__ = "Dr June Moone" | |
__webpage__ = "https://github.com/MooneDrJune" | |
__license__ = "MIT" | |
# Install pypolars via `pip install -U polars[all]` | |
import sys | |
import time | |
import logging | |
import calendar | |
import operator as op | |
import datetime as dt | |
from time import sleep | |
from datetime import datetime as dtdt | |
from typing import Any, List, Dict, Optional, Union | |
try: | |
from typing import Literal | |
except ImportError: | |
from typing_extensions import Literal | |
try: | |
import requests | |
import polars as pl | |
import numpy as np | |
from kiteconnect import KiteConnect | |
except (ImportError, ModuleNotFoundError): | |
__import__("os").system( | |
f"{sys.executable}" | |
+ " -m pip install -U" | |
+ " requests numpy kiteconnect polars[all]" # noqa: E501 | |
) | |
finally: | |
import requests | |
import polars as pl | |
import numpy as np | |
from kiteconnect import KiteConnect | |
calendar.setfirstweekday(0) | |
logging.basicConfig(level=logging.DEBUG) | |
operations = { | |
"+": op.add, | |
"concat": op.concat, | |
"in": op.contains, | |
"/": op.truediv, | |
"//": op.floordiv, | |
"&": op.and_, | |
"^": op.xor, | |
"~": op.invert, | |
"|": op.or_, | |
"**": op.pow, | |
"is": op.is_, | |
"isnot": op.is_not, | |
"obj[k]=v": op.setitem, | |
"del_obj[k]": op.delitem, | |
"obj[k]": op.getitem, | |
"<<": op.lshift, | |
"%": op.mod, | |
"*": op.mul, | |
"@": op.matmul, | |
"-ve": op.neg, | |
"not": op.not_, | |
"+ve": op.pos, | |
">>": op.rshift, | |
"seq[i:j]=v": op.setitem, | |
"del_seq[i:j]": op.delitem, | |
"seq[i:j]": op.getitem, | |
"-": op.sub, | |
"assert": op.truth, | |
"<": op.lt, | |
"<=": op.le, | |
"==": op.eq, | |
"!=": op.ne, | |
">=": op.ge, | |
">": op.gt, | |
} | |
class MyCustomClass: | |
ROUTES = { | |
"api": "https://api.kite.trade", | |
"instruments": "/instruments", | |
} | |
def __init__(self, kite: KiteConnect) -> None: | |
self.kite = kite | |
self.session = requests.session() | |
MyCustomClass.get_instruments_csv() | |
@staticmethod | |
def get_instruments_csv() -> None: | |
instruments = requests.get("https://api.kite.trade/instruments").text | |
with open(f"instruments.csv", "w") as csv_file: | |
csv_file.write(instruments) | |
@staticmethod | |
def get_expiries( | |
exchange: str = "NFO", | |
symbol: str = "BANKNIFTY", | |
segment: str = "OPTIONS", | |
dt_fmt: bool = False, | |
) -> pl.DataFrame: | |
if symbol is not None: | |
symbol = symbol.upper() | |
if exchange is not None: | |
exchange = exchange.upper() | |
if segment.upper() == "OPTIONS": | |
segment = f"{exchange}-OPT" | |
elif segment.upper() == "FUTURES": | |
segment = f"{exchange}-FUT" | |
else: | |
segment = exchange | |
df = ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.select(["name", "segment", "exchange", "expiry"]) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("segment") == segment) | |
& (pl.col("exchange") == exchange) | |
) | |
.select(["expiry"]) | |
.unique() | |
.select([pl.col("expiry").str.strptime(pl.Date, fmt="%Y-%m-%d").sort()]) | |
.select(["expiry", pl.col("expiry").shift(-1).alias("expiry_s")]) | |
.select( | |
[ | |
"expiry", | |
pl.struct(["expiry", "expiry_s"]) | |
.apply( | |
lambda x: "M" | |
if x["expiry_s"] is not None | |
and (x["expiry"].month != x["expiry_s"].month) | |
and ( | |
( | |
np.where( | |
np.array( | |
calendar.monthcalendar( | |
x["expiry"].year, x["expiry"].month | |
) | |
) | |
== x["expiry"].day | |
)[0][0] | |
+ 1 | |
) | |
!= 5 | |
or ( | |
np.where( | |
np.array( | |
calendar.monthcalendar( | |
x["expiry"].year, x["expiry"].month | |
) | |
) | |
== x["expiry"].day | |
)[0][0] | |
+ 1 | |
) | |
!= 4 | |
) | |
else "M" | |
if x["expiry_s"] is None | |
and ( | |
( | |
np.where( | |
np.array( | |
calendar.monthcalendar( | |
x["expiry"].year, x["expiry"].month | |
) | |
) | |
== x["expiry"].day | |
)[0][0] | |
+ 1 | |
) | |
== 5 | |
or ( | |
np.where( | |
np.array( | |
calendar.monthcalendar( | |
x["expiry"].year, x["expiry"].month | |
) | |
) | |
== x["expiry"].day | |
)[0][0] | |
+ 1 | |
) | |
== 4 | |
) | |
else "W" | |
) | |
.alias("expiry_type"), | |
] | |
) | |
) | |
return df.collect() | |
@staticmethod | |
def get_all_expiries_of_type( | |
expiries: pl.DataFrame = None, | |
weekly: bool = False, | |
monthly: bool = False, | |
) -> pl.DataFrame: | |
if expiries is None: | |
expiries = MyCustomClass.get_expiries() | |
if weekly: | |
return expiries.filter(pl.col("expiry_type") == "W") | |
if monthly: | |
return expiries.filter(pl.col("expiry_type") == "M") | |
@staticmethod | |
def get_only_near_next_far_expiries( | |
expiries: pl.DataFrame = None, | |
weekly: bool = False, | |
monthly: bool = False, | |
) -> pl.DataFrame: | |
if expiries is None: | |
expiries = MyCustomClass.get_expiries() | |
if weekly: | |
exp = ( | |
expiries.filter(pl.col("expiry_type") == "W") | |
.get_column("expiry") | |
.to_list()[:3] | |
) | |
return { | |
"NearWeeklyExpiry": exp[0], | |
"NextWeeklyExpiry": exp[1], | |
"FarWeeklyExpiry": exp[2], | |
} | |
if monthly: | |
exp = ( | |
expiries.filter(pl.col("expiry_type") == "M") | |
.get_column("expiry") | |
.to_list()[:3] | |
) | |
return { | |
"NearMonthlyExpiry": exp[0], | |
"NextMonthlyExpiry": exp[1], | |
"FarMonthlyExpiry": exp[2], | |
} | |
@staticmethod | |
def get_week_of_month(year: int, month: int, day: int) -> tuple: | |
_words = { | |
1: "FirstWeek", | |
2: "SecondWeek", | |
3: "ThirdWeek", | |
4: "FourthWeek", | |
5: "FifthWeek", | |
} | |
_x = np.array(calendar.monthcalendar(year, month)) | |
_week_of_month = np.where(_x == day)[0][0] + 1 | |
return (_week_of_month, _words[_week_of_month]) | |
@staticmethod | |
def get_week_no(dt_object) -> str: | |
return dt_object.strftime("%V") | |
@staticmethod | |
def is_expiry_today( | |
expiries: pl.DataFrame = None, | |
) -> Dict[str, Union[str, dtdt]]: | |
if expiries is None: | |
expiries = MyCustomClass.get_expiries() | |
(ExpiryDate, ExpiryType) = expiries.row(0) | |
IsExpiryToday = dt.date.today() == ExpiryDate | |
return { | |
"IsExpiryToday": IsExpiryToday, | |
"ExpiryDate": ExpiryDate, | |
"ExpiryType": "Weekly" if ExpiryType == "W" else "Monthly", | |
} | |
@staticmethod | |
def isNowInTimePeriod(startTime, endTime, nowTime) -> bool: | |
if startTime < endTime: | |
return nowTime >= startTime and nowTime <= endTime | |
else: | |
# Over midnight: | |
return nowTime >= startTime or nowTime <= endTime | |
@staticmethod | |
def isMarketTime(testing=False) -> bool: | |
if testing: | |
return True | |
else: | |
return MyCustomClass.isNowInTimePeriod( | |
dt.time(3, 45), dt.time(10, 0), dtdt.now().time() | |
) and (dtdt.now().strftime("%A") not in ["Saturday", "Sunday"]) | |
@staticmethod | |
def get_instrument_tokens_and_symbols( | |
exchange: Optional[str] = "NFO", | |
symbol: Optional[str] = "BANKNIFTY", | |
expiry: Union[str, dtdt] = None, | |
options: Optional[bool] = False, | |
option_type: Literal["call", "put"] = None, | |
futures: Optional[bool] = False, | |
indices: Optional[bool] = False, | |
) -> pl.DataFrame: | |
if symbol is not None: | |
symbol = symbol.upper() | |
if isinstance(expiry, dtdt): | |
expiry = expiry.strftime("%Y-%m-%d") | |
if exchange is not None: | |
exchange = exchange.upper() | |
if options and futures: | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("expiry") == expiry) | |
& (pl.col("instrument_type") != "EQ") | |
& (pl.col("segment") != exchange) | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
elif options and not futures: | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("expiry") == expiry) | |
& (pl.col("segment") == f"{exchange}-OPT") | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
elif options and not futures and option_type.lower() == "call": | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("expiry") == expiry) | |
& (pl.col("instrument_type") == "CE") | |
& (pl.col("segment") == f"{exchange}-OPT") | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
elif options and not futures and option_type.lower() == "put": | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("expiry") == expiry) | |
& (pl.col("instrument_type") == "PE") | |
& (pl.col("segment") == f"{exchange}-OPT") | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
elif not options and futures: | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("expiry") == expiry) | |
& (pl.col("instrument_type") == "FUT") | |
& (pl.col("segment") == f"{exchange}-FUT") | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
else: | |
if indices: | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("instrument_type") == "EQ") | |
& (pl.col("segment") == "INDICES") | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
else: | |
return ( | |
pl.scan_csv( | |
file="instruments.csv", | |
has_header=True, | |
dtypes={ | |
"instrument_token": pl.Int64, | |
"exchange_token": pl.Int64, | |
"tradingsymbol": pl.Utf8, | |
"name": pl.Utf8, | |
"last_price": pl.Float64, | |
"expiry": pl.Utf8, | |
"strike": pl.Float64, | |
"tick_size": pl.Float64, | |
"lot_size": pl.Int64, | |
"instrument_type": pl.Utf8, | |
"segment": pl.Utf8, | |
"exchange": pl.Utf8, | |
}, | |
encoding="utf8", | |
infer_schema_length=5, | |
parse_dates=True, | |
eol_char="\n", | |
) | |
.filter( | |
(pl.col("name") == symbol) | |
& (pl.col("instrument_type") == "EQ") | |
& (pl.col("segment") == exchange) | |
& (pl.col("exchange") == exchange) | |
) | |
.collect() | |
) | |
def get_instruments_ltp( | |
self, instruments: pl.DataFrame | |
) -> pl.DataFrame: # noqa E501 | |
return pl.from_dicts( | |
[ | |
{ | |
**{ | |
"exchange": key.split(":")[0], | |
"instrument": key.split(":")[-1], | |
}, # noqa E501 | |
**value, | |
} | |
for key, value in self.kite.quote( | |
instruments.select( | |
[ | |
pl.concat_str( | |
[ | |
"i=" + pl.col("exchange"), | |
pl.col("tradingsymbol"), | |
], # noqa E501 | |
sep=":", | |
).alias("exchange_tradingsymbol") | |
] | |
) | |
.get_column("exchange_tradingsymbol") | |
.to_list() | |
).items() | |
] | |
) | |
def get_ce_pe_name_if_ltp_condition_matches( | |
self, | |
ltp_df: pl.DataFrame = None, | |
only_ce: bool = False, | |
only_pe: bool = False, | |
only_first_ce_pe: bool = False, | |
reverse: bool = False, | |
condition_price: float = 0.0, | |
operator: str = None, | |
) -> pl.DataFrame: | |
if only_ce: | |
return ltp_df.filter( | |
(pl.col("instrument").str.contains("CE")) | |
& (operations[operator](pl.col("last_price"), condition_price)) | |
).sort("last_price", reverse=reverse) | |
elif only_pe: | |
return ltp_df.filter( | |
(pl.col("instrument").str.contains("PE")) | |
& (operations[operator](pl.col("last_price"), condition_price)) | |
).sort("last_price", reverse=reverse) | |
elif only_first_ce_pe: | |
return ( | |
ltp_df.filter( | |
operations[operator](pl.col("last_price"), condition_price) | |
) # noqa E501 | |
.sort("last_price", reverse=reverse) | |
.slice(0, 2) | |
) | |
else: | |
return ltp_df.filter( | |
operations[operator](pl.col("last_price"), condition_price) | |
).sort("last_price", reverse=reverse) | |
if __name__ == "__main__": | |
kite = KiteConnect(api_key="your_api_key") | |
data = kite.generate_session( | |
request_token="request_token_here", api_secret="your_secret" | |
) # noqa E501 | |
kite.set_access_token(access_token=data["access_token"]) | |
z = MyCustomClass(kite) | |
exchange = "NFO" | |
symbol = "BANKNIFTY" | |
expiries = z.get_expiries(exchange, symbol, segment="OPTIONS") | |
upcoming_expiry = expiries.select(pl.col("expiry").first()).row(0)[0] | |
instruments = z.get_instrument_tokens_and_symbols( | |
exchange, symbol, expiry=upcoming_expiry, options=True | |
) | |
while True: | |
start_timer = time.time() | |
ce_pe_whose_ltp_below_100 = z.get_ce_pe_name_if_ltp_condition_matches( | |
ltp_df=z.get_instruments_ltp(instruments), | |
only_first_ce_pe=False, | |
reverse=True, | |
condition_price=100.0, | |
operator="<=", | |
) | |
print(ce_pe_whose_ltp_below_100) | |
print( | |
f">>> In Total It Took {(time.time() - start_timer)} seconds," | |
+ " for fetching and displaying\n" | |
+ " data for ce_pe_where_ltp_below_100" | |
) | |
sleep(1) # Sleep for 1 Second, Increase It as per your wish |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment