Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active September 17, 2020 17:41
Show Gist options
  • Save rmoff/eadf82da8a0cd506c6c4a19ebd18037e to your computer and use it in GitHub Desktop.
Save rmoff/eadf82da8a0cd506c6c4a19ebd18037e to your computer and use it in GitHub Desktop.
Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python
# @rmoff December 21, 2016
#
# Based on direct_kafka_wordcount.py
# (https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py)
# ------
# Can be run under Docker using https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook
# ------
# Launch syntax:
# /usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
# --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 \
# kafka_twitter_sparkstreaming.py
# Import dependencies
# Print to stdout
from __future__ import print_function
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
# Create Spark context
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountRM")
# Create Streaming context, with a batch duration of 10 seconds
# ref: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext
# ref: http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext
ssc = StreamingContext(sc, 10)
# Connect to Kafka, topic 'twitter', consumer group 'spark-streaming'
# ref: http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
kafkaStream = KafkaUtils.createStream(ssc, 'kafka-zk:2181', 'spark-streaming', {'twitter':1})
# Parse the inbound message as json
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of instance of each tweet text
text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\
reduceByKey(lambda x,y: x + y)
# Print the text counts (first ten shown)
text_counts.pprint()
# Count the number of tweets per author
author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\
reduceByKey(lambda x,y: x + y)
# Print the author tweet counts (first ten shown)
author_counts.pprint()
# Start the streaming context
ssc.start()
ssc.awaitTermination()
17/01/04 14:08:10 INFO DAGScheduler: ResultStage 42 (runJob at PythonRDD.scala:441) finished in 0.122 s
17/01/04 14:08:10 INFO DAGScheduler: Job 21 finished: runJob at PythonRDD.scala:441, took 0.129443 s
-------------------------------------------
Time: 2017-01-04 14:08:10
-------------------------------------------
('_ https://t.co/gn0B7jxDYj #Samsung #Oracle #Smartphone #Epic 427', 1)
('4.9 Rating! #TopRated #Bestseller \n\nTHE LAST ORACLE! \n\n"The masterpiece of the trilogy!" "Epic romance!"\n\nhttps://t.co/MR8i95m9Sf', 1)
('RT @shadihamid: Semi-serious question: do we have any actual reports of people moving to Canada b/c of Trump? (getting accepted into McGill…', 1)
('RT @AdedapoLinda: #CROSSCHECK purged SOCIAL Security-Medicare-1st amend @docrocktex26 @sarahkendzior @Ireland0828 @sumoh7 @Deoliver47… ', 1)
('@rolandscahill @realDonaldTrump all she has to do is make good on her word and move to Canada.', 1)
17/01/04 14:08:10 INFO JobScheduler: Finished job streaming job 1483538890000 ms.0 from job set of time 1483538890000 ms
17/01/04 14:08:10 INFO JobScheduler: Starting job streaming job 1483538890000 ms.1 from job set of time 1483538890000 ms
17/01/04 14:08:10 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/01/04 14:08:10 INFO DAGScheduler: Registering RDD 59 (call at /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py:2230)
17/01/04 14:08:10 INFO DAGScheduler: Got job 22 (runJob at PythonRDD.scala:441) with 1 output partitions
[...]
17/01/04 14:08:30 INFO DAGScheduler: ResultStage 66 (runJob at PythonRDD.scala:441) finished in 0.102 s
17/01/04 14:08:30 INFO DAGScheduler: Job 33 finished: runJob at PythonRDD.scala:441, took 0.109977 s
-------------------------------------------
Time: 2017-01-04 14:08:30
-------------------------------------------
('Oracle DBA https://t.co/x1Xp5j2ISn', 1)
('DBA Oracle https://t.co/QoTr3XBUrJ', 1)
17/01/04 14:08:30 INFO JobScheduler: Finished job streaming job 1483538910000 ms.0 from job set of time 1483538910000 ms
@rakeshdey0018
Copy link

showing me that " no kson obkect coukd be decoded"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment