Skip to content

Instantly share code, notes, and snippets.

@raaghulr
Forked from vividvilla/README.md
Created May 19, 2021 06:44
Show Gist options
  • Save raaghulr/e44121955a1b0c8a9880b685298caf52 to your computer and use it in GitHub Desktop.
Save raaghulr/e44121955a1b0c8a9880b685298caf52 to your computer and use it in GitHub Desktop.
Kite connect Python ticker save to database example

Instructions

CREATE DATABASE ticks;
CREATE TABLE ticks (
token integer NOT NULL,
date timestamp without time zone,
price double precision
);
# Run celery workers
# celery -A db worker --loglevel=info
import sys
import json
import psycopg2
import logging
from celery import Celery
from datetime import datetime
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Configure with your own broker
app = Celery("tasks", broker="redis://localhost:6379/4")
# Initialize db
db = psycopg2.connect(database="ticks", user="username", password="password", host="127.0.0.1", port="5432")
# Db insert statement
insert_tick_statement = "INSERT INTO ticks (date, token, price) VALUES (%(date)s, %(token)s, %(price)s)"
# Task to insert to SQLite db
@app.task
def insert_ticks(ticks):
c = db.cursor()
for tick in ticks:
c.execute(insert_tick_statement, {
"date": datetime.now(),
"token": tick["instrument_token"],
"price": tick["last_price"]})
logging.info("Inserting ticks to db : {}".format(json.dumps(ticks)))
try:
db.commit()
except Exception:
db.rollback()
logging.exception("Couldn't write ticks to db: ")
import sys
import json
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
import time
from db import insert_ticks
from kiteconnect import WebSocket
# Initialise.
kws = WebSocket("api_key", "public_token", "zerodha_user_id")
# RELIANCE BSE, RELIANCE NSE, NIFTY 50, SENSEX
tokens = [128083204, 73856, 256265, 265]
# Callback for tick reception.
def on_tick(ticks, ws):
logging.info("on tick - {}".format(json.dumps(ticks)))
insert_ticks.delay(ticks)
# Callback for successful connection.
def on_connect(ws):
logging.info("Successfully connected to WebSocket")
def on_close():
logging.info("WebSocket connection closed")
def on_error():
logging.info("WebSocket connection thrown error")
# Assign the callbacks.
kws.on_tick = on_tick
kws.on_connect = on_connect
kws.on_close = on_close
kws.on_error = on_error
# Infinite loop on the main thread. Nothing after this will run.
# You have to use the pre-defined callbacks to manage subscriptions.
kws.connect(threaded=True)
# kws.connect(disable_ssl_verification=True) # for ubuntu
count = 0
while True:
logging.info("This is main thread. Will subscribe to each token in tokens list with 5s delay")
if count < len(tokens):
if kws.is_connected():
logging.info("Subscribing to: {}".format(tokens[count]))
kws.subscribe([tokens[count]])
kws.set_mode(kws.MODE_LTP, [tokens[count]])
count += 1
else:
logging.info("Connecting to WebSocket...")
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment