Last active
September 17, 2020 17:41
-
-
Save rmoff/eadf82da8a0cd506c6c4a19ebd18037e to your computer and use it in GitHub Desktop.
Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python | |
# @rmoff December 21, 2016 | |
# | |
# Based on direct_kafka_wordcount.py | |
# (https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py) | |
# ------ | |
# Can be run under Docker using https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook | |
# ------ | |
# Launch syntax: | |
# /usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \ | |
# --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 \ | |
# kafka_twitter_sparkstreaming.py | |
# Import dependencies | |
# Print to stdout | |
from __future__ import print_function | |
# Spark | |
from pyspark import SparkContext | |
# Spark Streaming | |
from pyspark.streaming import StreamingContext | |
# Kafka | |
from pyspark.streaming.kafka import KafkaUtils | |
# json parsing | |
import json | |
# Create Spark context | |
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountRM") | |
# Create Streaming context, with a batch duration of 10 seconds | |
# ref: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext | |
# ref: http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext | |
ssc = StreamingContext(sc, 10) | |
# Connect to Kafka, topic 'twitter', consumer group 'spark-streaming' | |
# ref: http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html | |
kafkaStream = KafkaUtils.createStream(ssc, 'kafka-zk:2181', 'spark-streaming', {'twitter':1}) | |
# Parse the inbound message as json | |
parsed = kafkaStream.map(lambda v: json.loads(v[1])) | |
# Count the number of instance of each tweet text | |
text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\ | |
reduceByKey(lambda x,y: x + y) | |
# Print the text counts (first ten shown) | |
text_counts.pprint() | |
# Count the number of tweets per author | |
author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\ | |
reduceByKey(lambda x,y: x + y) | |
# Print the author tweet counts (first ten shown) | |
author_counts.pprint() | |
# Start the streaming context | |
ssc.start() | |
ssc.awaitTermination() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
17/01/04 14:08:10 INFO DAGScheduler: ResultStage 42 (runJob at PythonRDD.scala:441) finished in 0.122 s | |
17/01/04 14:08:10 INFO DAGScheduler: Job 21 finished: runJob at PythonRDD.scala:441, took 0.129443 s | |
------------------------------------------- | |
Time: 2017-01-04 14:08:10 | |
------------------------------------------- | |
('_ https://t.co/gn0B7jxDYj #Samsung #Oracle #Smartphone #Epic 427', 1) | |
('4.9 Rating! #TopRated #Bestseller \n\nTHE LAST ORACLE! \n\n"The masterpiece of the trilogy!" "Epic romance!"\n\nhttps://t.co/MR8i95m9Sf', 1) | |
('RT @shadihamid: Semi-serious question: do we have any actual reports of people moving to Canada b/c of Trump? (getting accepted into McGill…', 1) | |
('RT @AdedapoLinda: #CROSSCHECK purged SOCIAL Security-Medicare-1st amend @docrocktex26 @sarahkendzior @Ireland0828 @sumoh7 @Deoliver47… ', 1) | |
('@rolandscahill @realDonaldTrump all she has to do is make good on her word and move to Canada.', 1) | |
17/01/04 14:08:10 INFO JobScheduler: Finished job streaming job 1483538890000 ms.0 from job set of time 1483538890000 ms | |
17/01/04 14:08:10 INFO JobScheduler: Starting job streaming job 1483538890000 ms.1 from job set of time 1483538890000 ms | |
17/01/04 14:08:10 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441 | |
17/01/04 14:08:10 INFO DAGScheduler: Registering RDD 59 (call at /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py:2230) | |
17/01/04 14:08:10 INFO DAGScheduler: Got job 22 (runJob at PythonRDD.scala:441) with 1 output partitions | |
[...] | |
17/01/04 14:08:30 INFO DAGScheduler: ResultStage 66 (runJob at PythonRDD.scala:441) finished in 0.102 s | |
17/01/04 14:08:30 INFO DAGScheduler: Job 33 finished: runJob at PythonRDD.scala:441, took 0.109977 s | |
------------------------------------------- | |
Time: 2017-01-04 14:08:30 | |
------------------------------------------- | |
('Oracle DBA https://t.co/x1Xp5j2ISn', 1) | |
('DBA Oracle https://t.co/QoTr3XBUrJ', 1) | |
17/01/04 14:08:30 INFO JobScheduler: Finished job streaming job 1483538910000 ms.0 from job set of time 1483538910000 ms |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
showing me that " no kson obkect coukd be decoded"