Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active September 17, 2020 17:41
Show Gist options
  • Save rmoff/eadf82da8a0cd506c6c4a19ebd18037e to your computer and use it in GitHub Desktop.
Save rmoff/eadf82da8a0cd506c6c4a19ebd18037e to your computer and use it in GitHub Desktop.
Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*Based on [direct_kafka_wordcount.py](https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py)*\n",
"\n",
"[`@rmoff`](https://twitter.com/rmoff/), January 4, 2017"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Make Kafka available to Jupyter"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import os\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import dependencies"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Print to stdout\n",
"from __future__ import print_function\n",
"# Spark\n",
"from pyspark import SparkContext\n",
"# Spark Streaming\n",
"from pyspark.streaming import StreamingContext\n",
"# Kafka\n",
"from pyspark.streaming.kafka import KafkaUtils\n",
"# json parsing\n",
"import json"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Spark context"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"sc = SparkContext(appName=\"PythonStreamingDirectKafkaWordCountRM\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Streaming context, with a batch duration of 10 seconds\n",
"\n",
"* http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext\n",
"* http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ssc = StreamingContext(sc, 10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connect to Kafka\n",
"\n",
"Topic `twitter`, Consumer group `spark-streaming`\n",
"\n",
"* http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parse the inbound message as json"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"parsed = kafkaStream.map(lambda v: json.loads(v[1]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Count the number of instance of each tweet text"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\\\n",
" reduceByKey(lambda x,y: x + y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Print the text counts (first ten shown)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"text_counts.pprint()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Count the number of tweets per author"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\\\n",
" reduceByKey(lambda x,y: x + y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Print the author tweet counts (first ten shown)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"author_counts.pprint()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start the streaming context"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:20\n",
"-------------------------------------------\n",
"(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/eONdIe8xvL | #Election2016', 1)\n",
"(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/NC6njITxcE | #Election2016', 1)\n",
"(u'A new approach to moving ore from remote mines: #airships! https://t.co/HmuoGQjEzC', 1)\n",
"(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/sk05rwzxMa | #Election2016', 1)\n",
"(u'Donald Trump appears to trust Julian Assange more than US intelligence agents https://t.co/ShlmBXoW4Z #election2016 https://t.co/HS95S98fOS', 1)\n",
"(u'Ngopo2 kudu dipikir disik. Ojo nganti kowe kagol amargo opo sing tok lakoni kuwi ora di pikir sek.', 1)\n",
"(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/f94cqyq06e | #Election2016', 1)\n",
"(u'Mengagendakan jalan-jalan sendiri di akhir pekan nanti. Kudu kelakon ya Ga biar ngga rencana mulu dari jaman-jaman penat UAS.', 1)\n",
"(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/YpWSL6qxia | #Election2016', 1)\n",
"(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/quHaez3x4q | #Election2016', 1)\n",
"...\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:20\n",
"-------------------------------------------\n",
"(u'POLSRochester', 1)\n",
"(u'votebotvotebot', 1)\n",
"(u'dbiservices', 1)\n",
"(u'CintraSoftware', 1)\n",
"(u'POLSPittsburgh', 1)\n",
"(u'tasyarozli', 1)\n",
"(u'tweetazsugar', 1)\n",
"(u'POLSDesMoines', 1)\n",
"(u'POLSHartford', 1)\n",
"(u'RebellionRider', 1)\n",
"...\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:30\n",
"-------------------------------------------\n",
"(u'Technical Project Manager - (Java, Websphere, Oracle Apps DBA ) (#Harrisburg, PA) #job#ProjectManager https://t.co/pWcekZcRxS', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:30\n",
"-------------------------------------------\n",
"(u'JCjobPOSTINGS', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:40\n",
"-------------------------------------------\n",
"(u'#ImWithHer #Election2016 #AMJOY #Decision2016 Sad but true. Will PEOTUS pardon Snowden & Manning too?\\U0001f4af\\U0001f612\\U0001f644\\U0001f622 https://t.co/THaKoYb3dS', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:40\n",
"-------------------------------------------\n",
"(u'ReneeB_75', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:50\n",
"-------------------------------------------\n",
"(u'RT @byTurgutUyar: \"Yine ku\\u015fkuya d\\xfc\\u015fecek kadar \\xfc\\u015f\\xfcyorum.\"- Franz Kafka', 1)\n",
"(u'RT @miabuelasabia: S\\xe9 feliz con lo que tienes en tus brazos, porque un d\\xeda puedes cargar el vac\\xedo\\n(Franz Kafka)', 1)\n",
"(u'RT @EXASOLAG: Performance comparison between #EXASOL & #Oracle In-Memory Option - https://t.co/rpu6F37H0S (in Russian) #bigdata #analytics\\u2026', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:28:50\n",
"-------------------------------------------\n",
"(u'OscaryMadriz', 1)\n",
"(u'aylinyigit6', 1)\n",
"(u'EXAGolo', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:00\n",
"-------------------------------------------\n",
"(u'@Snoopy not move to Canada.', 1)\n",
"(u'_ https://t.co/V9KoFbSQ5G #Samsung #Oracle #TipsAndTricks #HTML5 9413', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:00\n",
"-------------------------------------------\n",
"(u'eddie8374', 1)\n",
"(u'cn1bot3', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:10\n",
"-------------------------------------------\n",
"(u'RT @OUPAcademic: The role of family values in the 2016 presidential election https://t.co/AYoizr7poo Via @OUPReligion #Election2016', 1)\n",
"(u'NEED ONLY USC AND GC :- Oracle EBS Finance Developer__Phoenix, AZ https://t.co/XSlvp7Ny2N', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:10\n",
"-------------------------------------------\n",
"(u'academicpresses', 1)\n",
"(u'hireitpeople', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:20\n",
"-------------------------------------------\n",
"(u'VIDEO: Paul Ryan Stops Lawmaker\\u2019s Son from \\u2018Dabbing\\u2019 During Swearing-In Ceremony https://t.co/DC9gebMymB #tcot\\u2026 https://t.co/MAJZ6jDSnT', 1)\n",
"(u'.@GOP @SpeakerRyan @SenateMajLdr @SenateGOP @HouseGOP\\n#Election2016 Mandate\\n#ElectoralCollege Landslide\\u2026 https://t.co/dRx5jJKBIc', 1)\n",
"(u'Lalu salibis, zionis dan Syiah Rafidhah gak bahaya gitu? Jd Erdogan boleh mesra dgn mrk? https://t.co/ACWOUTA1kC', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:20\n",
"-------------------------------------------\n",
"(u'Makibao008', 1)\n",
"(u'TrumpCard555', 1)\n",
"(u'cafenetamerica', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:30\n",
"-------------------------------------------\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:30\n",
"-------------------------------------------\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:40\n",
"-------------------------------------------\n",
"(u'RT @orclapexblogs: Oracle ACE Alumnus - so long, and thanks for all the fish https://t.co/vmHTphn2Z4 #OrclAPEX #odtug', 1)\n",
"(u'IM NOT READY FOR BUNKASAI, UWAA NISA KUDU OTTOKHAE? >.<', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:40\n",
"-------------------------------------------\n",
"(u'lbrizzi', 1)\n",
"(u'annisahael', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:50\n",
"-------------------------------------------\n",
"(u'#empleo #IT Analista T\\xe9cnico Oracle - Madrid https://t.co/tmSgkAKO0d', 1)\n",
"(u'RT @booknerdfession: \\u201cMemories warm you up from the inside. But they also tear you apart.\\u201d \\n\\u2015 Haruki Murakami, Kafka on the Shore', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:29:50\n",
"-------------------------------------------\n",
"(u'MalikAqsa', 1)\n",
"(u'IJInformatico', 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:30:00\n",
"-------------------------------------------\n",
"(u'.@Harlan #Election2016 left US with the same unaccountable dysfunctional party duopoly. Join me to invoke reform. #iamameriCAN #FirstCitizen', 1)\n",
"(u'RT @bendawla: kafka demi\\u015f ki ah milena sen ba\\u015fkayd\\u0131n hasta bir adam\\u0131 sevecek kadar hastayd\\u0131n milena da demi\\u015f ne hastas\\u0131 ya makyaj yapmad\\u0131m\\u2026', 1)\n",
"(u\"@FiiDee_ mtu hawezi kudu any. I'm here\\U0001f606\", 1)\n",
"\n",
"-------------------------------------------\n",
"Time: 2017-01-04 14:30:00\n",
"-------------------------------------------\n",
"(u'Ecenurztrk', 1)\n",
"(u'mejar412', 1)\n",
"(u'1stcitizen', 1)\n",
"\n"
]
}
],
"source": [
"ssc.start()\n",
"ssc.awaitTermination()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
# Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python
# @rmoff December 21, 2016
#
# Based on direct_kafka_wordcount.py
# (https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py)
# ------
# Can be run under Docker using https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook
# ------
# Launch syntax:
# /usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
# --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 \
# kafka_twitter_sparkstreaming.py
# Import dependencies
# Print to stdout
from __future__ import print_function
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
# Create Spark context
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountRM")
# Create Streaming context, with a batch duration of 10 seconds
# ref: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext
# ref: http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext
ssc = StreamingContext(sc, 10)
# Connect to Kafka, topic 'twitter', consumer group 'spark-streaming'
# ref: http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
kafkaStream = KafkaUtils.createStream(ssc, 'kafka-zk:2181', 'spark-streaming', {'twitter':1})
# Parse the inbound message as json
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of instance of each tweet text
text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\
reduceByKey(lambda x,y: x + y)
# Print the text counts (first ten shown)
text_counts.pprint()
# Count the number of tweets per author
author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\
reduceByKey(lambda x,y: x + y)
# Print the author tweet counts (first ten shown)
author_counts.pprint()
# Start the streaming context
ssc.start()
ssc.awaitTermination()
17/01/04 14:08:10 INFO DAGScheduler: ResultStage 42 (runJob at PythonRDD.scala:441) finished in 0.122 s
17/01/04 14:08:10 INFO DAGScheduler: Job 21 finished: runJob at PythonRDD.scala:441, took 0.129443 s
-------------------------------------------
Time: 2017-01-04 14:08:10
-------------------------------------------
('_ https://t.co/gn0B7jxDYj #Samsung #Oracle #Smartphone #Epic 427', 1)
('4.9 Rating! #TopRated #Bestseller \n\nTHE LAST ORACLE! \n\n"The masterpiece of the trilogy!" "Epic romance!"\n\nhttps://t.co/MR8i95m9Sf', 1)
('RT @shadihamid: Semi-serious question: do we have any actual reports of people moving to Canada b/c of Trump? (getting accepted into McGill…', 1)
('RT @AdedapoLinda: #CROSSCHECK purged SOCIAL Security-Medicare-1st amend @docrocktex26 @sarahkendzior @Ireland0828 @sumoh7 @Deoliver47… ', 1)
('@rolandscahill @realDonaldTrump all she has to do is make good on her word and move to Canada.', 1)
17/01/04 14:08:10 INFO JobScheduler: Finished job streaming job 1483538890000 ms.0 from job set of time 1483538890000 ms
17/01/04 14:08:10 INFO JobScheduler: Starting job streaming job 1483538890000 ms.1 from job set of time 1483538890000 ms
17/01/04 14:08:10 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/01/04 14:08:10 INFO DAGScheduler: Registering RDD 59 (call at /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py:2230)
17/01/04 14:08:10 INFO DAGScheduler: Got job 22 (runJob at PythonRDD.scala:441) with 1 output partitions
[...]
17/01/04 14:08:30 INFO DAGScheduler: ResultStage 66 (runJob at PythonRDD.scala:441) finished in 0.102 s
17/01/04 14:08:30 INFO DAGScheduler: Job 33 finished: runJob at PythonRDD.scala:441, took 0.109977 s
-------------------------------------------
Time: 2017-01-04 14:08:30
-------------------------------------------
('Oracle DBA https://t.co/x1Xp5j2ISn', 1)
('DBA Oracle https://t.co/QoTr3XBUrJ', 1)
17/01/04 14:08:30 INFO JobScheduler: Finished job streaming job 1483538910000 ms.0 from job set of time 1483538910000 ms
@rakeshdey0018
Copy link

showing me that " no kson obkect coukd be decoded"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment