-
-
Save oldmonkABA/286c4f3080610b2d5ff48480d13579bf to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*- | |
import json | |
import os | |
import logmatic | |
import logging | |
from logging.config import dictConfig | |
import requests | |
from http import HTTPStatus | |
import re | |
from kiteconnect import KiteConnect | |
import os | |
import datetime | |
__author__ = "oldmonkABA" | |
# TODO: These can be passed in as command line arguments. Make this change later! | |
# Name of file containing configuration information for the logging framework | |
LOG_FILE_NAME = "logging.json" | |
# Name of file containing KITE connection information | |
CONNECTION_INFO_FILE_NAME = "connection_info.json" | |
# Logger for this module | |
LOGGER = logging.getLogger(__name__) | |
def init_logging(): | |
""" | |
Initialize the logging framework with the configuration | |
:return: Nothing | |
:rtype: None | |
""" | |
here = os.path.dirname(__file__) | |
log_fp = os.path.join(here, LOG_FILE_NAME) | |
if not os.path.exists(log_fp): | |
raise FileNotFoundError("Logging configuration file not found!") | |
with open(log_fp, mode="r") as log_config: | |
dictConfig(config=json.load(log_config)) | |
LOGGER.info("Logging framework initialized!") | |
def load_kite_config(): | |
""" | |
Load the kite configuration information from the json file | |
:return: A python dictionary | |
:rtype: dict | |
""" | |
# Get the directory where this python script is being executed | |
here = os.path.dirname(__file__) | |
connect_fp = os.path.join(here, CONNECTION_INFO_FILE_NAME) | |
if not os.path.exists(connect_fp): | |
raise FileNotFoundError("Connection information file not found!") | |
with open(connect_fp, mode="r") as connect_file: | |
config = json.load(connect_file) | |
LOGGER.info("Kite connection information loaded successfully from file!") | |
return config | |
def need_to_generate_token(file): | |
flag = False | |
here = os.path.dirname(__file__) | |
fp = os.path.join(here, file) | |
login_time='' | |
if os.path.isfile(fp): | |
print(fp+" present in pwd") | |
with open(fp, 'r') as f: | |
data = json.load(f) | |
LOGGER.info("Previous login time for "+data["user_name"]+" is "+str(data["login_time"])) | |
login_time = datetime.datetime.strptime(data["login_time"],"%Y-%m-%d %H:%M:%S") | |
today = datetime.datetime.now() | |
cut_off_time = datetime.datetime(today.year,today.month,today.day,7,00,00) | |
if login_time > cut_off_time: | |
LOGGER.info("Acces token is fresh") | |
else: | |
os.remove(fp) | |
flag=True | |
LOGGER.info(file+" has been deleted.") | |
else: | |
LOGGER.info(file+" does not exist in pwd") | |
flag = True | |
return flag,login_time | |
def kite_prelogin(config, http_session): | |
""" | |
Perform pre-login into kite | |
:param config: The python dictionary containing kite configuration information | |
:type config: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response referer url | |
:rtype: str | |
""" | |
url = config["LOGIN_REFERER"] + config["KITE_API_KEY"] | |
response = http_session.get(url=url) | |
return response.url | |
def login_kite(config, http_session): | |
""" | |
Perform a login | |
:param config: The python dictionary containing kite configuration information | |
:type config: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response payload as a python dictionary | |
:rtype: dict | |
""" | |
url = config["LOGIN_URL"] | |
data = dict() | |
data["user_id"] = config["USER"] | |
data["password"] = config["PASSWORD"] | |
response = http_session.post(url=url, data=data) | |
print(response.content) | |
# Deserialize the response content | |
resp_dict = json.loads(response.content) | |
if "message" in resp_dict.keys(): | |
# Since logging framework already expects message as a parameter change the key | |
resp_dict["err_message"] = resp_dict["message"] | |
del resp_dict["message"] | |
if response.status_code != HTTPStatus.OK: | |
LOGGER.error("Login failure", extra=resp_dict) | |
raise ConnectionError("Login failed!") | |
return resp_dict | |
def kite_twofa(login_resp, config, http_session): | |
""" | |
Perform kite-two-factor authentication | |
:param login_resp: The response payload from the primary user login | |
:type login_resp: dict | |
:param config: The python dictionary containing kite configuration information | |
:type config: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response payload as a python dictionary | |
:rtype: dict | |
""" | |
url = config["TWOFA_URL"] | |
data = dict() | |
data["user_id"] = config["USER"] | |
data["request_id"] = login_resp["data"]["request_id"] | |
data["twofa_value"] = str(config["PIN"]) | |
response = http_session.post(url=url, data=data) | |
# Deserialize the response content | |
resp_dict = json.loads(response.content) | |
if "message" in resp_dict.keys(): | |
# Since logging framework already expects message as a parameter change the key | |
resp_dict["err_message"] = resp_dict["message"] | |
del resp_dict["message"] | |
if response.status_code != HTTPStatus.OK: | |
LOGGER.error("Two-factor authentication failure", extra=resp_dict) | |
raise ConnectionError("Two-factor authentication failed!") | |
return resp_dict | |
def kite_post_twofa(url, http_session): | |
""" | |
Perform action after kite-two-factor authentication | |
:param login_resp: The response payload from the primary user login | |
:type login_resp: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response payload as a python dictionary | |
:rtype: dict | |
""" | |
url = url+"&skip_session=true" | |
#data = dict() | |
#data["user_id"] = config["USER"] | |
#data["request_id"] = login_resp["data"]["request_id"] | |
#data["twofa_value"] = str(config["PIN"]) | |
response = http_session.get(url=url, allow_redirects=False) | |
if response.status_code == 302: | |
reply = response.headers["Location"] | |
request_token = re.findall(r'request_token=(.*)&action',reply)[0] | |
# Deserialize the response content | |
return request_token | |
def generate_access_token(file,config,request_token): | |
here = os.path.dirname(__file__) | |
fp = os.path.join(here, file) | |
kite = KiteConnect(api_key=config["KITE_API_KEY"]) | |
data = kite.generate_session(request_token, api_secret=config["KITE_API_SECRET"]) | |
user_data = json.dumps(data, indent=4, sort_keys=True,default=str) | |
#print(data["user_name"],data["login_time"],data["access_token"]) | |
with open(fp, "w") as outfile: | |
outfile.write(user_data) | |
#time.sleep(5) | |
LOGGER.info("Automatic login for "+data["user_name"]+" is done. "+file+" has been written to disk") | |
if __name__ == "__main__": | |
# Initialize logging framework | |
init_logging() | |
# Load the kite configuration information | |
kite_config = load_kite_config() | |
file = 'access_credentials.json' | |
generate_token,login_time = need_to_generate_token(file) | |
if generate_token : | |
sess = requests.Session() | |
# Attempt pre-login | |
ref_url = kite_prelogin(config=kite_config, http_session=sess) | |
# Attempt a login and get the response as a dictionary | |
user_pass_login_resp = login_kite(config=kite_config, http_session=sess) | |
LOGGER.info("Login successful!") | |
# Attempt two-factor auth | |
two_fa_resp = kite_twofa(login_resp=user_pass_login_resp, config=kite_config, http_session=sess) | |
LOGGER.info("Two-factor authentication passed!", extra=two_fa_resp) | |
request_token = kite_post_twofa(url=ref_url,http_session=sess) | |
LOGGER.info("Generated request token = %s",str(request_token)) | |
generate_access_token(file,kite_config,request_token) | |
else: | |
LOGGER.info("Access token is valid till next day 7 am from "+str(login_time)) | |
{ | |
"KITE_API_KEY": "", | |
"USER": "", | |
"PASSWORD": "", | |
"PIN": "", | |
"KITE_API_SECRET":"", | |
"LOGIN_REFERER": "https://kite.trade/connect/login?v=3&api_key=", | |
"LOGIN_URL": "https://kite.zerodha.com/api/login", | |
"TWOFA_URL": "https://kite.zerodha.com/api/twofa" | |
} |
{ | |
"version": 1, | |
"disable_existing_loggers": false, | |
"formatters": { | |
"package_formatter": { | |
"format": "[%(asctime)s] - [%(levelname)s] - [%(name)s] : %(message)s" | |
}, | |
"json": { | |
"()": "logmatic.JsonFormatter" | |
} | |
}, | |
"handlers": { | |
"console": { | |
"class": "logging.StreamHandler", | |
"level": "DEBUG", | |
"formatter": "json" | |
} | |
}, | |
"loggers": { | |
}, | |
"root": { | |
"level": "INFO", | |
"handlers": [ | |
"console" | |
] | |
} | |
} |
certifi==2020.4.5.1 | |
chardet==3.0.4 | |
idna==2.9 | |
logmatic-python==0.1.7 | |
python-json-logger==0.1.11 | |
requests==2.23.0 | |
urllib3==1.25.8 |
access_credentials.json what is that file ?
response = http_session.get(url=url, allow_redirects=False)
if response.status_code == 302:
reply = response.headers["Location"]
This code under kite_post_twofa, will not work. There are two redirects for that url, but due to redirect=False, it is stopping at the first one. So you will not get location details in the response headers.
I am not getting any success on the automatic generation of the access token. I am getting an error of list index out of range
in function kite_post_two_fa
because of the value of response.headers['Location']
is https://kite.zerodha.com/connect/finish?api_key=0xxxxxxxxxrxxxxx&sess_id=mVW5XlgjxH5qIP6jRSU0Dmi9xqrZLl9g
(i.e. the URL does not contain the key work request_token=
)
Althouhg I am getting output of {'status': 'success', 'data': {'profile': {}}}
on 'kite_twofa
function that means till TWOFA , the script is working fine but when I add &skip_session=true
to ref_url
, kite terminate the session. Any idea how to solve this issue? Thanks.
Use Jugaad trader....... Much simpler but you get only 1 active session.
I am not getting any success on the automatic generation of the access token. I am getting an error of
list index out of range
in functionkite_post_two_fa
because of the value ofresponse.headers['Location']
ishttps://kite.zerodha.com/connect/finish?api_key=0xxxxxxxxxrxxxxx&sess_id=mVW5XlgjxH5qIP6jRSU0Dmi9xqrZLl9g
(i.e. the URL does not contain the key workrequest_token=
)Althouhg I am getting output of
{'status': 'success', 'data': {'profile': {}}}
on'kite_twofa
function that means till TWOFA , the script is working fine but when I add&skip_session=true
toref_url
, kite terminate the session. Any idea how to solve this issue? Thanks.
def kite_post_twofa(url, http_session):
"""
Perform action after kite-two-factor authentication
:param login_resp: The response payload from the primary user login
:type login_resp: dict
:param http_session: A http session
:type http_session: :class:requests.Session
:return: The response payload as a python dictionary
:rtype: dict
"""
url = url + "&skip_session=true"
# data = dict()
# data["user_id"] = config["USER"]
# data["request_id"] = login_resp["data"]["request_id"]
# data["twofa_value"] = str(config["PIN"])
request_token = ''
try:
response = http_session.get(url=url, allow_redirects=True)
except Exception as e:
pat = re.compile(r'request_token=(.*?)\s+')
request_token = re.search(pat, str(e)).group(1).strip()
# Deserialize the response content
return request_token
This modification worked for me.
I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.
I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.
Hi, is this still working for you?
I am receiving below error while using it.
request_token = parse_qs(urlparse(response).query)['request_token'][0]
KeyError: 'request_token'
I noticed my url is as below, not sure why request_token is not part of it.
'https://kite.zerodha.com/connect/login?sess_id=*******&api_key=*****7&skip_session=true'
@ankit179 use jugaad python library search on github.com it is good for cloud. No API access required.