Created
January 7, 2024 08:58
-
-
Save j-thepac/20233296807de9a4a6634c66df9289f6 to your computer and use it in GitHub Desktop.
Real Time Kafka Streaming NSE StockMarket Data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% | |
import httpx | |
# %% | |
url = "https://www.nseindia.com/api/marketStatus" | |
headers = { | |
"Accept": "*/*", | |
"Accept-Language": "en-US,en;q=0.9,hi;q=0.8", | |
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
} | |
# %% | |
from dataclasses import dataclass | |
# %% | |
@dataclass | |
class Nifty(): | |
market:str | |
marketStatus:str | |
tradeDate:str | |
index:str | |
last:float | |
variation:float | |
percentChange:float | |
marketStatusMessage:str | |
# %% | |
def api(): | |
resp=httpx.get(url,headers=headers) | |
nifty=Nifty(**resp.json()["marketState"][0]) | |
return nifty | |
# %% | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
import os | |
import time | |
spark =SparkSession.builder\ | |
.appName("kafka")\ | |
.config('spark.jars.packages','org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0')\ | |
.getOrCreate() | |
# %% | |
host,usn,password,topic="rocket","gdqt5d3d",os.environ["CLOUD_KAFKA"],"gdqt5d3d-test" | |
server=f"{host}-01.srvs.cloudkafka.com:9094,{host}-02.srvs.cloudkafka.com:9094,{host}-03.srvs.cloudkafka.com:9094" | |
# %% | |
while True: | |
df:DataFrame=spark.createDataFrame([api()]) | |
df=df.withColumn("value",to_json(struct(*df.columns)))\ | |
.withColumn("key",lit("a"))\ | |
.select(["value","key"])\ | |
.write\ | |
.format("kafka")\ | |
.option("kafka.bootstrap.servers",server)\ | |
.option("topic",topic)\ | |
.option("kafka.security.protocol","SASL_SSL")\ | |
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")\ | |
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")\ | |
.save() | |
time.sleep(1) | |
print("time elapsed is 1sec") | |
# %% | |
df=(spark | |
.read | |
.format("kafka") | |
.option("kafka.bootstrap.servers",server) | |
.option("subscribe", f"{topic}") | |
.option("startingOffsets", "earliest") | |
.option("kafka.security.protocol","SASL_SSL") | |
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") | |
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';") | |
.load()) | |
# %% | |
df.select("value").selectExpr("CAST(value as String)").show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment