-
-
Save oldmonkABA/286c4f3080610b2d5ff48480d13579bf to your computer and use it in GitHub Desktop.
| # -*- coding: utf-8 -*- | |
| import json | |
| import os | |
| import logmatic | |
| import logging | |
| from logging.config import dictConfig | |
| import requests | |
| from http import HTTPStatus | |
| import re | |
| from kiteconnect import KiteConnect | |
| import os | |
| import datetime | |
| __author__ = "oldmonkABA" | |
| # TODO: These can be passed in as command line arguments. Make this change later! | |
| # Name of file containing configuration information for the logging framework | |
| LOG_FILE_NAME = "logging.json" | |
| # Name of file containing KITE connection information | |
| CONNECTION_INFO_FILE_NAME = "connection_info.json" | |
| # Logger for this module | |
| LOGGER = logging.getLogger(__name__) | |
| def init_logging(): | |
| """ | |
| Initialize the logging framework with the configuration | |
| :return: Nothing | |
| :rtype: None | |
| """ | |
| here = os.path.dirname(__file__) | |
| log_fp = os.path.join(here, LOG_FILE_NAME) | |
| if not os.path.exists(log_fp): | |
| raise FileNotFoundError("Logging configuration file not found!") | |
| with open(log_fp, mode="r") as log_config: | |
| dictConfig(config=json.load(log_config)) | |
| LOGGER.info("Logging framework initialized!") | |
| def load_kite_config(): | |
| """ | |
| Load the kite configuration information from the json file | |
| :return: A python dictionary | |
| :rtype: dict | |
| """ | |
| # Get the directory where this python script is being executed | |
| here = os.path.dirname(__file__) | |
| connect_fp = os.path.join(here, CONNECTION_INFO_FILE_NAME) | |
| if not os.path.exists(connect_fp): | |
| raise FileNotFoundError("Connection information file not found!") | |
| with open(connect_fp, mode="r") as connect_file: | |
| config = json.load(connect_file) | |
| LOGGER.info("Kite connection information loaded successfully from file!") | |
| return config | |
| def need_to_generate_token(file): | |
| flag = False | |
| here = os.path.dirname(__file__) | |
| fp = os.path.join(here, file) | |
| login_time='' | |
| if os.path.isfile(fp): | |
| print(fp+" present in pwd") | |
| with open(fp, 'r') as f: | |
| data = json.load(f) | |
| LOGGER.info("Previous login time for "+data["user_name"]+" is "+str(data["login_time"])) | |
| login_time = datetime.datetime.strptime(data["login_time"],"%Y-%m-%d %H:%M:%S") | |
| today = datetime.datetime.now() | |
| cut_off_time = datetime.datetime(today.year,today.month,today.day,7,00,00) | |
| if login_time > cut_off_time: | |
| LOGGER.info("Acces token is fresh") | |
| else: | |
| os.remove(fp) | |
| flag=True | |
| LOGGER.info(file+" has been deleted.") | |
| else: | |
| LOGGER.info(file+" does not exist in pwd") | |
| flag = True | |
| return flag,login_time | |
| def kite_prelogin(config, http_session): | |
| """ | |
| Perform pre-login into kite | |
| :param config: The python dictionary containing kite configuration information | |
| :type config: dict | |
| :param http_session: A http session | |
| :type http_session: :class:`requests.Session` | |
| :return: The response referer url | |
| :rtype: str | |
| """ | |
| url = config["LOGIN_REFERER"] + config["KITE_API_KEY"] | |
| response = http_session.get(url=url) | |
| return response.url | |
| def login_kite(config, http_session): | |
| """ | |
| Perform a login | |
| :param config: The python dictionary containing kite configuration information | |
| :type config: dict | |
| :param http_session: A http session | |
| :type http_session: :class:`requests.Session` | |
| :return: The response payload as a python dictionary | |
| :rtype: dict | |
| """ | |
| url = config["LOGIN_URL"] | |
| data = dict() | |
| data["user_id"] = config["USER"] | |
| data["password"] = config["PASSWORD"] | |
| response = http_session.post(url=url, data=data) | |
| print(response.content) | |
| # Deserialize the response content | |
| resp_dict = json.loads(response.content) | |
| if "message" in resp_dict.keys(): | |
| # Since logging framework already expects message as a parameter change the key | |
| resp_dict["err_message"] = resp_dict["message"] | |
| del resp_dict["message"] | |
| if response.status_code != HTTPStatus.OK: | |
| LOGGER.error("Login failure", extra=resp_dict) | |
| raise ConnectionError("Login failed!") | |
| return resp_dict | |
| def kite_twofa(login_resp, config, http_session): | |
| """ | |
| Perform kite-two-factor authentication | |
| :param login_resp: The response payload from the primary user login | |
| :type login_resp: dict | |
| :param config: The python dictionary containing kite configuration information | |
| :type config: dict | |
| :param http_session: A http session | |
| :type http_session: :class:`requests.Session` | |
| :return: The response payload as a python dictionary | |
| :rtype: dict | |
| """ | |
| url = config["TWOFA_URL"] | |
| data = dict() | |
| data["user_id"] = config["USER"] | |
| data["request_id"] = login_resp["data"]["request_id"] | |
| data["twofa_value"] = str(config["PIN"]) | |
| response = http_session.post(url=url, data=data) | |
| # Deserialize the response content | |
| resp_dict = json.loads(response.content) | |
| if "message" in resp_dict.keys(): | |
| # Since logging framework already expects message as a parameter change the key | |
| resp_dict["err_message"] = resp_dict["message"] | |
| del resp_dict["message"] | |
| if response.status_code != HTTPStatus.OK: | |
| LOGGER.error("Two-factor authentication failure", extra=resp_dict) | |
| raise ConnectionError("Two-factor authentication failed!") | |
| return resp_dict | |
| def kite_post_twofa(url, http_session): | |
| """ | |
| Perform action after kite-two-factor authentication | |
| :param login_resp: The response payload from the primary user login | |
| :type login_resp: dict | |
| :param http_session: A http session | |
| :type http_session: :class:`requests.Session` | |
| :return: The response payload as a python dictionary | |
| :rtype: dict | |
| """ | |
| url = url+"&skip_session=true" | |
| #data = dict() | |
| #data["user_id"] = config["USER"] | |
| #data["request_id"] = login_resp["data"]["request_id"] | |
| #data["twofa_value"] = str(config["PIN"]) | |
| response = http_session.get(url=url, allow_redirects=False) | |
| if response.status_code == 302: | |
| reply = response.headers["Location"] | |
| request_token = re.findall(r'request_token=(.*)&action',reply)[0] | |
| # Deserialize the response content | |
| return request_token | |
| def generate_access_token(file,config,request_token): | |
| here = os.path.dirname(__file__) | |
| fp = os.path.join(here, file) | |
| kite = KiteConnect(api_key=config["KITE_API_KEY"]) | |
| data = kite.generate_session(request_token, api_secret=config["KITE_API_SECRET"]) | |
| user_data = json.dumps(data, indent=4, sort_keys=True,default=str) | |
| #print(data["user_name"],data["login_time"],data["access_token"]) | |
| with open(fp, "w") as outfile: | |
| outfile.write(user_data) | |
| #time.sleep(5) | |
| LOGGER.info("Automatic login for "+data["user_name"]+" is done. "+file+" has been written to disk") | |
| if __name__ == "__main__": | |
| # Initialize logging framework | |
| init_logging() | |
| # Load the kite configuration information | |
| kite_config = load_kite_config() | |
| file = 'access_credentials.json' | |
| generate_token,login_time = need_to_generate_token(file) | |
| if generate_token : | |
| sess = requests.Session() | |
| # Attempt pre-login | |
| ref_url = kite_prelogin(config=kite_config, http_session=sess) | |
| # Attempt a login and get the response as a dictionary | |
| user_pass_login_resp = login_kite(config=kite_config, http_session=sess) | |
| LOGGER.info("Login successful!") | |
| # Attempt two-factor auth | |
| two_fa_resp = kite_twofa(login_resp=user_pass_login_resp, config=kite_config, http_session=sess) | |
| LOGGER.info("Two-factor authentication passed!", extra=two_fa_resp) | |
| request_token = kite_post_twofa(url=ref_url,http_session=sess) | |
| LOGGER.info("Generated request token = %s",str(request_token)) | |
| generate_access_token(file,kite_config,request_token) | |
| else: | |
| LOGGER.info("Access token is valid till next day 7 am from "+str(login_time)) | |
| { | |
| "KITE_API_KEY": "", | |
| "USER": "", | |
| "PASSWORD": "", | |
| "PIN": "", | |
| "KITE_API_SECRET":"", | |
| "LOGIN_REFERER": "https://kite.trade/connect/login?v=3&api_key=", | |
| "LOGIN_URL": "https://kite.zerodha.com/api/login", | |
| "TWOFA_URL": "https://kite.zerodha.com/api/twofa" | |
| } |
| { | |
| "version": 1, | |
| "disable_existing_loggers": false, | |
| "formatters": { | |
| "package_formatter": { | |
| "format": "[%(asctime)s] - [%(levelname)s] - [%(name)s] : %(message)s" | |
| }, | |
| "json": { | |
| "()": "logmatic.JsonFormatter" | |
| } | |
| }, | |
| "handlers": { | |
| "console": { | |
| "class": "logging.StreamHandler", | |
| "level": "DEBUG", | |
| "formatter": "json" | |
| } | |
| }, | |
| "loggers": { | |
| }, | |
| "root": { | |
| "level": "INFO", | |
| "handlers": [ | |
| "console" | |
| ] | |
| } | |
| } |
| certifi==2020.4.5.1 | |
| chardet==3.0.4 | |
| idna==2.9 | |
| logmatic-python==0.1.7 | |
| python-json-logger==0.1.11 | |
| requests==2.23.0 | |
| urllib3==1.25.8 |
See the documentation : https://github.com/logmatic/logmatic-python
@ankit179 use jugaad python library search on github.com it is good for cloud. No API access required.
access_credentials.json what is that file ?
response = http_session.get(url=url, allow_redirects=False)
if response.status_code == 302:
reply = response.headers["Location"]
This code under kite_post_twofa, will not work. There are two redirects for that url, but due to redirect=False, it is stopping at the first one. So you will not get location details in the response headers.
I am not getting any success on the automatic generation of the access token. I am getting an error of list index out of range in function kite_post_two_fa because of the value of response.headers['Location'] is https://kite.zerodha.com/connect/finish?api_key=0xxxxxxxxxrxxxxx&sess_id=mVW5XlgjxH5qIP6jRSU0Dmi9xqrZLl9g (i.e. the URL does not contain the key work request_token=)
Althouhg I am getting output of {'status': 'success', 'data': {'profile': {}}} on 'kite_twofa function that means till TWOFA , the script is working fine but when I add &skip_session=true to ref_url, kite terminate the session. Any idea how to solve this issue? Thanks.
Use Jugaad trader....... Much simpler but you get only 1 active session.
I am not getting any success on the automatic generation of the access token. I am getting an error of
list index out of rangein functionkite_post_two_fabecause of the value ofresponse.headers['Location']ishttps://kite.zerodha.com/connect/finish?api_key=0xxxxxxxxxrxxxxx&sess_id=mVW5XlgjxH5qIP6jRSU0Dmi9xqrZLl9g(i.e. the URL does not contain the key workrequest_token=)Althouhg I am getting output of
{'status': 'success', 'data': {'profile': {}}}on'kite_twofafunction that means till TWOFA , the script is working fine but when I add&skip_session=truetoref_url, kite terminate the session. Any idea how to solve this issue? Thanks.
def kite_post_twofa(url, http_session):
"""
Perform action after kite-two-factor authentication
:param login_resp: The response payload from the primary user login
:type login_resp: dict
:param http_session: A http session
:type http_session: :class:requests.Session
:return: The response payload as a python dictionary
:rtype: dict
"""
url = url + "&skip_session=true"
# data = dict()
# data["user_id"] = config["USER"]
# data["request_id"] = login_resp["data"]["request_id"]
# data["twofa_value"] = str(config["PIN"])
request_token = ''
try:
response = http_session.get(url=url, allow_redirects=True)
except Exception as e:
pat = re.compile(r'request_token=(.*?)\s+')
request_token = re.search(pat, str(e)).group(1).strip()
# Deserialize the response content
return request_token
This modification worked for me.
I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.
I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.
Hi, is this still working for you?
I am receiving below error while using it.
request_token = parse_qs(urlparse(response).query)['request_token'][0]
KeyError: 'request_token'
I noticed my url is as below, not sure why request_token is not part of it.
'https://kite.zerodha.com/connect/login?sess_id=*******&api_key=*****7&skip_session=true'
Hi, really appreciate for your reply in the other post. Your selenium code for auto logging works flawless but difficult to implement on google cloud. I am trying to get this code working but getting the same error despite saving logging.json file in the same folder:
Traceback (most recent call last):
File "C:\Users\A\AppData\Local\Programs\Python\Python36\lib\logging\config.py", line 382, in resolve
found = getattr(found, frag)
AttributeError: module 'logmatic' has no attribute 'JsonFormatter'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\A\AppData\Local\Programs\Python\Python36\lib\logging\config.py", line 384, in resolve
self.importer(used)
ModuleNotFoundError: No module named 'logmatic.JsonFormatter'
Would appreciate your help,
Thanks,
Ankit.