Created
April 6, 2020 19:18
-
-
Save oldmonkABA/286c4f3080610b2d5ff48480d13579bf to your computer and use it in GitHub Desktop.
Automatic login to kiteconnect using only requests library. Due credits to https://gist.github.com/GannyS/619e29dc999ffd014e80ec69ae190b93
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import json | |
import os | |
import logmatic | |
import logging | |
from logging.config import dictConfig | |
import requests | |
from http import HTTPStatus | |
import re | |
from kiteconnect import KiteConnect | |
import os | |
import datetime | |
__author__ = "oldmonkABA" | |
# TODO: These can be passed in as command line arguments. Make this change later! | |
# Name of file containing configuration information for the logging framework | |
LOG_FILE_NAME = "logging.json" | |
# Name of file containing KITE connection information | |
CONNECTION_INFO_FILE_NAME = "connection_info.json" | |
# Logger for this module | |
LOGGER = logging.getLogger(__name__) | |
def init_logging(): | |
""" | |
Initialize the logging framework with the configuration | |
:return: Nothing | |
:rtype: None | |
""" | |
here = os.path.dirname(__file__) | |
log_fp = os.path.join(here, LOG_FILE_NAME) | |
if not os.path.exists(log_fp): | |
raise FileNotFoundError("Logging configuration file not found!") | |
with open(log_fp, mode="r") as log_config: | |
dictConfig(config=json.load(log_config)) | |
LOGGER.info("Logging framework initialized!") | |
def load_kite_config(): | |
""" | |
Load the kite configuration information from the json file | |
:return: A python dictionary | |
:rtype: dict | |
""" | |
# Get the directory where this python script is being executed | |
here = os.path.dirname(__file__) | |
connect_fp = os.path.join(here, CONNECTION_INFO_FILE_NAME) | |
if not os.path.exists(connect_fp): | |
raise FileNotFoundError("Connection information file not found!") | |
with open(connect_fp, mode="r") as connect_file: | |
config = json.load(connect_file) | |
LOGGER.info("Kite connection information loaded successfully from file!") | |
return config | |
def need_to_generate_token(file): | |
flag = False | |
here = os.path.dirname(__file__) | |
fp = os.path.join(here, file) | |
login_time='' | |
if os.path.isfile(fp): | |
print(fp+" present in pwd") | |
with open(fp, 'r') as f: | |
data = json.load(f) | |
LOGGER.info("Previous login time for "+data["user_name"]+" is "+str(data["login_time"])) | |
login_time = datetime.datetime.strptime(data["login_time"],"%Y-%m-%d %H:%M:%S") | |
today = datetime.datetime.now() | |
cut_off_time = datetime.datetime(today.year,today.month,today.day,7,00,00) | |
if login_time > cut_off_time: | |
LOGGER.info("Acces token is fresh") | |
else: | |
os.remove(fp) | |
flag=True | |
LOGGER.info(file+" has been deleted.") | |
else: | |
LOGGER.info(file+" does not exist in pwd") | |
flag = True | |
return flag,login_time | |
def kite_prelogin(config, http_session): | |
""" | |
Perform pre-login into kite | |
:param config: The python dictionary containing kite configuration information | |
:type config: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response referer url | |
:rtype: str | |
""" | |
url = config["LOGIN_REFERER"] + config["KITE_API_KEY"] | |
response = http_session.get(url=url) | |
return response.url | |
def login_kite(config, http_session): | |
""" | |
Perform a login | |
:param config: The python dictionary containing kite configuration information | |
:type config: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response payload as a python dictionary | |
:rtype: dict | |
""" | |
url = config["LOGIN_URL"] | |
data = dict() | |
data["user_id"] = config["USER"] | |
data["password"] = config["PASSWORD"] | |
response = http_session.post(url=url, data=data) | |
print(response.content) | |
# Deserialize the response content | |
resp_dict = json.loads(response.content) | |
if "message" in resp_dict.keys(): | |
# Since logging framework already expects message as a parameter change the key | |
resp_dict["err_message"] = resp_dict["message"] | |
del resp_dict["message"] | |
if response.status_code != HTTPStatus.OK: | |
LOGGER.error("Login failure", extra=resp_dict) | |
raise ConnectionError("Login failed!") | |
return resp_dict | |
def kite_twofa(login_resp, config, http_session): | |
""" | |
Perform kite-two-factor authentication | |
:param login_resp: The response payload from the primary user login | |
:type login_resp: dict | |
:param config: The python dictionary containing kite configuration information | |
:type config: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response payload as a python dictionary | |
:rtype: dict | |
""" | |
url = config["TWOFA_URL"] | |
data = dict() | |
data["user_id"] = config["USER"] | |
data["request_id"] = login_resp["data"]["request_id"] | |
data["twofa_value"] = str(config["PIN"]) | |
response = http_session.post(url=url, data=data) | |
# Deserialize the response content | |
resp_dict = json.loads(response.content) | |
if "message" in resp_dict.keys(): | |
# Since logging framework already expects message as a parameter change the key | |
resp_dict["err_message"] = resp_dict["message"] | |
del resp_dict["message"] | |
if response.status_code != HTTPStatus.OK: | |
LOGGER.error("Two-factor authentication failure", extra=resp_dict) | |
raise ConnectionError("Two-factor authentication failed!") | |
return resp_dict | |
def kite_post_twofa(url, http_session): | |
""" | |
Perform action after kite-two-factor authentication | |
:param login_resp: The response payload from the primary user login | |
:type login_resp: dict | |
:param http_session: A http session | |
:type http_session: :class:`requests.Session` | |
:return: The response payload as a python dictionary | |
:rtype: dict | |
""" | |
url = url+"&skip_session=true" | |
#data = dict() | |
#data["user_id"] = config["USER"] | |
#data["request_id"] = login_resp["data"]["request_id"] | |
#data["twofa_value"] = str(config["PIN"]) | |
response = http_session.get(url=url, allow_redirects=False) | |
if response.status_code == 302: | |
reply = response.headers["Location"] | |
request_token = re.findall(r'request_token=(.*)&action',reply)[0] | |
# Deserialize the response content | |
return request_token | |
def generate_access_token(file,config,request_token): | |
here = os.path.dirname(__file__) | |
fp = os.path.join(here, file) | |
kite = KiteConnect(api_key=config["KITE_API_KEY"]) | |
data = kite.generate_session(request_token, api_secret=config["KITE_API_SECRET"]) | |
user_data = json.dumps(data, indent=4, sort_keys=True,default=str) | |
#print(data["user_name"],data["login_time"],data["access_token"]) | |
with open(fp, "w") as outfile: | |
outfile.write(user_data) | |
#time.sleep(5) | |
LOGGER.info("Automatic login for "+data["user_name"]+" is done. "+file+" has been written to disk") | |
if __name__ == "__main__": | |
# Initialize logging framework | |
init_logging() | |
# Load the kite configuration information | |
kite_config = load_kite_config() | |
file = 'access_credentials.json' | |
generate_token,login_time = need_to_generate_token(file) | |
if generate_token : | |
sess = requests.Session() | |
# Attempt pre-login | |
ref_url = kite_prelogin(config=kite_config, http_session=sess) | |
# Attempt a login and get the response as a dictionary | |
user_pass_login_resp = login_kite(config=kite_config, http_session=sess) | |
LOGGER.info("Login successful!") | |
# Attempt two-factor auth | |
two_fa_resp = kite_twofa(login_resp=user_pass_login_resp, config=kite_config, http_session=sess) | |
LOGGER.info("Two-factor authentication passed!", extra=two_fa_resp) | |
request_token = kite_post_twofa(url=ref_url,http_session=sess) | |
LOGGER.info("Generated request token = %s",str(request_token)) | |
generate_access_token(file,kite_config,request_token) | |
else: | |
LOGGER.info("Access token is valid till next day 7 am from "+str(login_time)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"KITE_API_KEY": "", | |
"USER": "", | |
"PASSWORD": "", | |
"PIN": "", | |
"KITE_API_SECRET":"", | |
"LOGIN_REFERER": "https://kite.trade/connect/login?v=3&api_key=", | |
"LOGIN_URL": "https://kite.zerodha.com/api/login", | |
"TWOFA_URL": "https://kite.zerodha.com/api/twofa" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"version": 1, | |
"disable_existing_loggers": false, | |
"formatters": { | |
"package_formatter": { | |
"format": "[%(asctime)s] - [%(levelname)s] - [%(name)s] : %(message)s" | |
}, | |
"json": { | |
"()": "logmatic.JsonFormatter" | |
} | |
}, | |
"handlers": { | |
"console": { | |
"class": "logging.StreamHandler", | |
"level": "DEBUG", | |
"formatter": "json" | |
} | |
}, | |
"loggers": { | |
}, | |
"root": { | |
"level": "INFO", | |
"handlers": [ | |
"console" | |
] | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
certifi==2020.4.5.1 | |
chardet==3.0.4 | |
idna==2.9 | |
logmatic-python==0.1.7 | |
python-json-logger==0.1.11 | |
requests==2.23.0 | |
urllib3==1.25.8 |
I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.
I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.
Hi, is this still working for you?
I am receiving below error while using it.
request_token = parse_qs(urlparse(response).query)['request_token'][0]
KeyError: 'request_token'
I noticed my url is as below, not sure why request_token is not part of it.
'https://kite.zerodha.com/connect/login?sess_id=*******&api_key=*****7&skip_session=true'
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
def kite_post_twofa(url, http_session):
"""
Perform action after kite-two-factor authentication
:param login_resp: The response payload from the primary user login
:type login_resp: dict
:param http_session: A http session
:type http_session: :class:
requests.Session
:return: The response payload as a python dictionary
:rtype: dict
"""
url = url + "&skip_session=true"
# data = dict()
# data["user_id"] = config["USER"]
# data["request_id"] = login_resp["data"]["request_id"]
# data["twofa_value"] = str(config["PIN"])
request_token = ''
try:
response = http_session.get(url=url, allow_redirects=True)
except Exception as e:
pat = re.compile(r'request_token=(.*?)\s+')
request_token = re.search(pat, str(e)).group(1).strip()
This modification worked for me.