Skip to content

Instantly share code, notes, and snippets.

@j-thepac
Created January 7, 2024 08:58
Show Gist options
  • Save j-thepac/20233296807de9a4a6634c66df9289f6 to your computer and use it in GitHub Desktop.
Save j-thepac/20233296807de9a4a6634c66df9289f6 to your computer and use it in GitHub Desktop.
Real Time Kafka Streaming NSE StockMarket Data
# %%
import httpx
# %%
url = "https://www.nseindia.com/api/marketStatus"
headers = {
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9,hi;q=0.8",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
# %%
from dataclasses import dataclass
# %%
@dataclass
class Nifty():
market:str
marketStatus:str
tradeDate:str
index:str
last:float
variation:float
percentChange:float
marketStatusMessage:str
# %%
def api():
resp=httpx.get(url,headers=headers)
nifty=Nifty(**resp.json()["marketState"][0])
return nifty
# %%
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os
import time
spark =SparkSession.builder\
.appName("kafka")\
.config('spark.jars.packages','org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0')\
.getOrCreate()
# %%
host,usn,password,topic="rocket","gdqt5d3d",os.environ["CLOUD_KAFKA"],"gdqt5d3d-test"
server=f"{host}-01.srvs.cloudkafka.com:9094,{host}-02.srvs.cloudkafka.com:9094,{host}-03.srvs.cloudkafka.com:9094"
# %%
while True:
df:DataFrame=spark.createDataFrame([api()])
df=df.withColumn("value",to_json(struct(*df.columns)))\
.withColumn("key",lit("a"))\
.select(["value","key"])\
.write\
.format("kafka")\
.option("kafka.bootstrap.servers",server)\
.option("topic",topic)\
.option("kafka.security.protocol","SASL_SSL")\
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")\
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")\
.save()
time.sleep(1)
print("time elapsed is 1sec")
# %%
df=(spark
.read
.format("kafka")
.option("kafka.bootstrap.servers",server)
.option("subscribe", f"{topic}")
.option("startingOffsets", "earliest")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
.load())
# %%
df.select("value").selectExpr("CAST(value as String)").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment