Skip to content

Instantly share code, notes, and snippets.

@MooneDrJune
Last active November 10, 2023 15:39
Show Gist options
  • Save MooneDrJune/7b4066b2a605fcb490a783f7b5304964 to your computer and use it in GitHub Desktop.
Save MooneDrJune/7b4066b2a605fcb490a783f7b5304964 to your computer and use it in GitHub Desktop.
get_ce_pe_based_on_ltp
# -*- coding: utf-8 -*-
"""
:description: A Blazingly Fast Custom Class To Fetch Instruments Token, Symbols, Expiries, LTP Etc. from KiteConnect.
:license: MIT.
:author: Dr June Moone
:created: On Monday September 26, 2022 11:17:53 GMT+05:30
"""
__author__ = "Dr June Moone"
__webpage__ = "https://github.com/MooneDrJune"
__license__ = "MIT"
# Install pypolars via `pip install -U polars[all]`
import sys
import time
import logging
import calendar
import operator as op
import datetime as dt
from time import sleep
from datetime import datetime as dtdt
from typing import Any, List, Dict, Optional, Union
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
try:
import requests
import polars as pl
import numpy as np
from kiteconnect import KiteConnect
except (ImportError, ModuleNotFoundError):
__import__("os").system(
f"{sys.executable}"
+ " -m pip install -U"
+ " requests numpy kiteconnect polars[all]" # noqa: E501
)
finally:
import requests
import polars as pl
import numpy as np
from kiteconnect import KiteConnect
calendar.setfirstweekday(0)
logging.basicConfig(level=logging.DEBUG)
operations = {
"+": op.add,
"concat": op.concat,
"in": op.contains,
"/": op.truediv,
"//": op.floordiv,
"&": op.and_,
"^": op.xor,
"~": op.invert,
"|": op.or_,
"**": op.pow,
"is": op.is_,
"isnot": op.is_not,
"obj[k]=v": op.setitem,
"del_obj[k]": op.delitem,
"obj[k]": op.getitem,
"<<": op.lshift,
"%": op.mod,
"*": op.mul,
"@": op.matmul,
"-ve": op.neg,
"not": op.not_,
"+ve": op.pos,
">>": op.rshift,
"seq[i:j]=v": op.setitem,
"del_seq[i:j]": op.delitem,
"seq[i:j]": op.getitem,
"-": op.sub,
"assert": op.truth,
"<": op.lt,
"<=": op.le,
"==": op.eq,
"!=": op.ne,
">=": op.ge,
">": op.gt,
}
class MyCustomClass:
ROUTES = {
"api": "https://api.kite.trade",
"instruments": "/instruments",
}
def __init__(self, kite: KiteConnect) -> None:
self.kite = kite
self.session = requests.session()
MyCustomClass.get_instruments_csv()
@staticmethod
def get_instruments_csv() -> None:
instruments = requests.get("https://api.kite.trade/instruments").text
with open(f"instruments.csv", "w") as csv_file:
csv_file.write(instruments)
@staticmethod
def get_expiries(
exchange: str = "NFO",
symbol: str = "BANKNIFTY",
segment: str = "OPTIONS",
dt_fmt: bool = False,
) -> pl.DataFrame:
if symbol is not None:
symbol = symbol.upper()
if exchange is not None:
exchange = exchange.upper()
if segment.upper() == "OPTIONS":
segment = f"{exchange}-OPT"
elif segment.upper() == "FUTURES":
segment = f"{exchange}-FUT"
else:
segment = exchange
df = (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.select(["name", "segment", "exchange", "expiry"])
.filter(
(pl.col("name") == symbol)
& (pl.col("segment") == segment)
& (pl.col("exchange") == exchange)
)
.select(["expiry"])
.unique()
.select([pl.col("expiry").str.strptime(pl.Date, fmt="%Y-%m-%d").sort()])
.select(["expiry", pl.col("expiry").shift(-1).alias("expiry_s")])
.select(
[
"expiry",
pl.struct(["expiry", "expiry_s"])
.apply(
lambda x: "M"
if x["expiry_s"] is not None
and (x["expiry"].month != x["expiry_s"].month)
and (
(
np.where(
np.array(
calendar.monthcalendar(
x["expiry"].year, x["expiry"].month
)
)
== x["expiry"].day
)[0][0]
+ 1
)
!= 5
or (
np.where(
np.array(
calendar.monthcalendar(
x["expiry"].year, x["expiry"].month
)
)
== x["expiry"].day
)[0][0]
+ 1
)
!= 4
)
else "M"
if x["expiry_s"] is None
and (
(
np.where(
np.array(
calendar.monthcalendar(
x["expiry"].year, x["expiry"].month
)
)
== x["expiry"].day
)[0][0]
+ 1
)
== 5
or (
np.where(
np.array(
calendar.monthcalendar(
x["expiry"].year, x["expiry"].month
)
)
== x["expiry"].day
)[0][0]
+ 1
)
== 4
)
else "W"
)
.alias("expiry_type"),
]
)
)
return df.collect()
@staticmethod
def get_all_expiries_of_type(
expiries: pl.DataFrame = None,
weekly: bool = False,
monthly: bool = False,
) -> pl.DataFrame:
if expiries is None:
expiries = MyCustomClass.get_expiries()
if weekly:
return expiries.filter(pl.col("expiry_type") == "W")
if monthly:
return expiries.filter(pl.col("expiry_type") == "M")
@staticmethod
def get_only_near_next_far_expiries(
expiries: pl.DataFrame = None,
weekly: bool = False,
monthly: bool = False,
) -> pl.DataFrame:
if expiries is None:
expiries = MyCustomClass.get_expiries()
if weekly:
exp = (
expiries.filter(pl.col("expiry_type") == "W")
.get_column("expiry")
.to_list()[:3]
)
return {
"NearWeeklyExpiry": exp[0],
"NextWeeklyExpiry": exp[1],
"FarWeeklyExpiry": exp[2],
}
if monthly:
exp = (
expiries.filter(pl.col("expiry_type") == "M")
.get_column("expiry")
.to_list()[:3]
)
return {
"NearMonthlyExpiry": exp[0],
"NextMonthlyExpiry": exp[1],
"FarMonthlyExpiry": exp[2],
}
@staticmethod
def get_week_of_month(year: int, month: int, day: int) -> tuple:
_words = {
1: "FirstWeek",
2: "SecondWeek",
3: "ThirdWeek",
4: "FourthWeek",
5: "FifthWeek",
}
_x = np.array(calendar.monthcalendar(year, month))
_week_of_month = np.where(_x == day)[0][0] + 1
return (_week_of_month, _words[_week_of_month])
@staticmethod
def get_week_no(dt_object) -> str:
return dt_object.strftime("%V")
@staticmethod
def is_expiry_today(
expiries: pl.DataFrame = None,
) -> Dict[str, Union[str, dtdt]]:
if expiries is None:
expiries = MyCustomClass.get_expiries()
(ExpiryDate, ExpiryType) = expiries.row(0)
IsExpiryToday = dt.date.today() == ExpiryDate
return {
"IsExpiryToday": IsExpiryToday,
"ExpiryDate": ExpiryDate,
"ExpiryType": "Weekly" if ExpiryType == "W" else "Monthly",
}
@staticmethod
def isNowInTimePeriod(startTime, endTime, nowTime) -> bool:
if startTime < endTime:
return nowTime >= startTime and nowTime <= endTime
else:
# Over midnight:
return nowTime >= startTime or nowTime <= endTime
@staticmethod
def isMarketTime(testing=False) -> bool:
if testing:
return True
else:
return MyCustomClass.isNowInTimePeriod(
dt.time(3, 45), dt.time(10, 0), dtdt.now().time()
) and (dtdt.now().strftime("%A") not in ["Saturday", "Sunday"])
@staticmethod
def get_instrument_tokens_and_symbols(
exchange: Optional[str] = "NFO",
symbol: Optional[str] = "BANKNIFTY",
expiry: Union[str, dtdt] = None,
options: Optional[bool] = False,
option_type: Literal["call", "put"] = None,
futures: Optional[bool] = False,
indices: Optional[bool] = False,
) -> pl.DataFrame:
if symbol is not None:
symbol = symbol.upper()
if isinstance(expiry, dtdt):
expiry = expiry.strftime("%Y-%m-%d")
if exchange is not None:
exchange = exchange.upper()
if options and futures:
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("expiry") == expiry)
& (pl.col("instrument_type") != "EQ")
& (pl.col("segment") != exchange)
& (pl.col("exchange") == exchange)
)
.collect()
)
elif options and not futures:
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("expiry") == expiry)
& (pl.col("segment") == f"{exchange}-OPT")
& (pl.col("exchange") == exchange)
)
.collect()
)
elif options and not futures and option_type.lower() == "call":
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("expiry") == expiry)
& (pl.col("instrument_type") == "CE")
& (pl.col("segment") == f"{exchange}-OPT")
& (pl.col("exchange") == exchange)
)
.collect()
)
elif options and not futures and option_type.lower() == "put":
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("expiry") == expiry)
& (pl.col("instrument_type") == "PE")
& (pl.col("segment") == f"{exchange}-OPT")
& (pl.col("exchange") == exchange)
)
.collect()
)
elif not options and futures:
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("expiry") == expiry)
& (pl.col("instrument_type") == "FUT")
& (pl.col("segment") == f"{exchange}-FUT")
& (pl.col("exchange") == exchange)
)
.collect()
)
else:
if indices:
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("instrument_type") == "EQ")
& (pl.col("segment") == "INDICES")
& (pl.col("exchange") == exchange)
)
.collect()
)
else:
return (
pl.scan_csv(
file="instruments.csv",
has_header=True,
dtypes={
"instrument_token": pl.Int64,
"exchange_token": pl.Int64,
"tradingsymbol": pl.Utf8,
"name": pl.Utf8,
"last_price": pl.Float64,
"expiry": pl.Utf8,
"strike": pl.Float64,
"tick_size": pl.Float64,
"lot_size": pl.Int64,
"instrument_type": pl.Utf8,
"segment": pl.Utf8,
"exchange": pl.Utf8,
},
encoding="utf8",
infer_schema_length=5,
parse_dates=True,
eol_char="\n",
)
.filter(
(pl.col("name") == symbol)
& (pl.col("instrument_type") == "EQ")
& (pl.col("segment") == exchange)
& (pl.col("exchange") == exchange)
)
.collect()
)
def get_instruments_ltp(
self, instruments: pl.DataFrame
) -> pl.DataFrame: # noqa E501
return pl.from_dicts(
[
{
**{
"exchange": key.split(":")[0],
"instrument": key.split(":")[-1],
}, # noqa E501
**value,
}
for key, value in self.kite.quote(
instruments.select(
[
pl.concat_str(
[
"i=" + pl.col("exchange"),
pl.col("tradingsymbol"),
], # noqa E501
sep=":",
).alias("exchange_tradingsymbol")
]
)
.get_column("exchange_tradingsymbol")
.to_list()
).items()
]
)
def get_ce_pe_name_if_ltp_condition_matches(
self,
ltp_df: pl.DataFrame = None,
only_ce: bool = False,
only_pe: bool = False,
only_first_ce_pe: bool = False,
reverse: bool = False,
condition_price: float = 0.0,
operator: str = None,
) -> pl.DataFrame:
if only_ce:
return ltp_df.filter(
(pl.col("instrument").str.contains("CE"))
& (operations[operator](pl.col("last_price"), condition_price))
).sort("last_price", reverse=reverse)
elif only_pe:
return ltp_df.filter(
(pl.col("instrument").str.contains("PE"))
& (operations[operator](pl.col("last_price"), condition_price))
).sort("last_price", reverse=reverse)
elif only_first_ce_pe:
return (
ltp_df.filter(
operations[operator](pl.col("last_price"), condition_price)
) # noqa E501
.sort("last_price", reverse=reverse)
.slice(0, 2)
)
else:
return ltp_df.filter(
operations[operator](pl.col("last_price"), condition_price)
).sort("last_price", reverse=reverse)
if __name__ == "__main__":
kite = KiteConnect(api_key="your_api_key")
data = kite.generate_session(
request_token="request_token_here", api_secret="your_secret"
) # noqa E501
kite.set_access_token(access_token=data["access_token"])
z = MyCustomClass(kite)
exchange = "NFO"
symbol = "BANKNIFTY"
expiries = z.get_expiries(exchange, symbol, segment="OPTIONS")
upcoming_expiry = expiries.select(pl.col("expiry").first()).row(0)[0]
instruments = z.get_instrument_tokens_and_symbols(
exchange, symbol, expiry=upcoming_expiry, options=True
)
while True:
start_timer = time.time()
ce_pe_whose_ltp_below_100 = z.get_ce_pe_name_if_ltp_condition_matches(
ltp_df=z.get_instruments_ltp(instruments),
only_first_ce_pe=False,
reverse=True,
condition_price=100.0,
operator="<=",
)
print(ce_pe_whose_ltp_below_100)
print(
f">>> In Total It Took {(time.time() - start_timer)} seconds,"
+ " for fetching and displaying\n"
+ " data for ce_pe_where_ltp_below_100"
)
sleep(1) # Sleep for 1 Second, Increase It as per your wish
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment