Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Created November 27, 2020 07:49
Show Gist options
  • Save thanoojgithub/d8991ac6a566c2df2c07d4b30a1aeead to your computer and use it in GitHub Desktop.
Save thanoojgithub/d8991ac6a566c2df2c07d4b30a1aeead to your computer and use it in GitHub Desktop.
pyspark streaming using netcat ‎as HTTP requests
hduser@thanoojubuntu-Inspiron-3521:~$ nc -lk 9999
helllo word hello python hello spark hello pyspark hellow streaming pyspark
/usr/bin/python3.7 /home/hduser/PycharmProjects/pythonProjectOne/com/PySparkStreamingOne.py
20/11/27 13:12:31 WARN util.Utils: Your hostname, thanoojubuntu-Inspiron-3521 resolves to a loopback address: 127.0.1.1; using 192.168.225.20 instead (on interface wlp8s0)
20/11/27 13:12:31 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
<pyspark.streaming.context.StreamingContext object at 0x7f9c81fb5490>
-------------------------------------------
Time: 2020-11-27 13:12:50
-------------------------------------------
-------------------------------------------
Time: 2020-11-27 13:13:00
-------------------------------------------
-------------------------------------------
Time: 2020-11-27 13:13:10
-------------------------------------------
-------------------------------------------
Time: 2020-11-27 13:13:20
-------------------------------------------
-------------------------------------------
Time: 2020-11-27 13:13:30
-------------------------------------------
-------------------------------------------
Time: 2020-11-27 13:13:40
-------------------------------------------
('helllo', 1)
('python', 1)
('streaming', 1)
('word', 1)
('hello', 3)
('spark', 1)
('pyspark', 2)
('hellow', 1)
-------------------------------------------
Time: 2020-11-27 13:13:50
-------------------------------------------
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
def getspark():
sparkcontextobj: SparkSession = SparkSession.builder \
.master("spark://thanoojubuntu-Inspiron-3521:7077") \
.getOrCreate().sparkContext
return StreamingContext(sparkcontextobj, 10)
if __name__ == '__main__':
ssc = getspark()
print(ssc)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment