Last active
September 17, 2020 17:41
-
-
Save rmoff/eadf82da8a0cd506c6c4a19ebd18037e to your computer and use it in GitHub Desktop.
Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "*Based on [direct_kafka_wordcount.py](https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py)*\n", | |
| "\n", | |
| "[`@rmoff`](https://twitter.com/rmoff/), January 4, 2017" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Make Kafka available to Jupyter" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "import os\n", | |
| "os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Import dependencies" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "# Print to stdout\n", | |
| "from __future__ import print_function\n", | |
| "# Spark\n", | |
| "from pyspark import SparkContext\n", | |
| "# Spark Streaming\n", | |
| "from pyspark.streaming import StreamingContext\n", | |
| "# Kafka\n", | |
| "from pyspark.streaming.kafka import KafkaUtils\n", | |
| "# json parsing\n", | |
| "import json" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Create Spark context" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "sc = SparkContext(appName=\"PythonStreamingDirectKafkaWordCountRM\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Create Streaming context, with a batch duration of 10 seconds\n", | |
| "\n", | |
| "* http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext\n", | |
| "* http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 9, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "ssc = StreamingContext(sc, 10)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Connect to Kafka\n", | |
| "\n", | |
| "Topic `twitter`, Consumer group `spark-streaming`\n", | |
| "\n", | |
| "* http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 10, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Parse the inbound message as json" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 11, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "parsed = kafkaStream.map(lambda v: json.loads(v[1]))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Count the number of instance of each tweet text" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 12, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\\\n", | |
| " reduceByKey(lambda x,y: x + y)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "### Print the text counts (first ten shown)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 13, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "text_counts.pprint()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Count the number of tweets per author" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 14, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\\\n", | |
| " reduceByKey(lambda x,y: x + y)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "### Print the author tweet counts (first ten shown)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 15, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "author_counts.pprint()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Start the streaming context" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:20\n", | |
| "-------------------------------------------\n", | |
| "(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/eONdIe8xvL | #Election2016', 1)\n", | |
| "(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/NC6njITxcE | #Election2016', 1)\n", | |
| "(u'A new approach to moving ore from remote mines: #airships! https://t.co/HmuoGQjEzC', 1)\n", | |
| "(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/sk05rwzxMa | #Election2016', 1)\n", | |
| "(u'Donald Trump appears to trust Julian Assange more than US intelligence agents https://t.co/ShlmBXoW4Z #election2016 https://t.co/HS95S98fOS', 1)\n", | |
| "(u'Ngopo2 kudu dipikir disik. Ojo nganti kowe kagol amargo opo sing tok lakoni kuwi ora di pikir sek.', 1)\n", | |
| "(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/f94cqyq06e | #Election2016', 1)\n", | |
| "(u'Mengagendakan jalan-jalan sendiri di akhir pekan nanti. Kudu kelakon ya Ga biar ngga rencana mulu dari jaman-jaman penat UAS.', 1)\n", | |
| "(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/YpWSL6qxia | #Election2016', 1)\n", | |
| "(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\\u2019s nominee for\\u2026 https://t.co/quHaez3x4q | #Election2016', 1)\n", | |
| "...\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:20\n", | |
| "-------------------------------------------\n", | |
| "(u'POLSRochester', 1)\n", | |
| "(u'votebotvotebot', 1)\n", | |
| "(u'dbiservices', 1)\n", | |
| "(u'CintraSoftware', 1)\n", | |
| "(u'POLSPittsburgh', 1)\n", | |
| "(u'tasyarozli', 1)\n", | |
| "(u'tweetazsugar', 1)\n", | |
| "(u'POLSDesMoines', 1)\n", | |
| "(u'POLSHartford', 1)\n", | |
| "(u'RebellionRider', 1)\n", | |
| "...\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:30\n", | |
| "-------------------------------------------\n", | |
| "(u'Technical Project Manager - (Java, Websphere, Oracle Apps DBA ) (#Harrisburg, PA) #job#ProjectManager https://t.co/pWcekZcRxS', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:30\n", | |
| "-------------------------------------------\n", | |
| "(u'JCjobPOSTINGS', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:40\n", | |
| "-------------------------------------------\n", | |
| "(u'#ImWithHer #Election2016 #AMJOY #Decision2016 Sad but true. Will PEOTUS pardon Snowden & Manning too?\\U0001f4af\\U0001f612\\U0001f644\\U0001f622 https://t.co/THaKoYb3dS', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:40\n", | |
| "-------------------------------------------\n", | |
| "(u'ReneeB_75', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:50\n", | |
| "-------------------------------------------\n", | |
| "(u'RT @byTurgutUyar: \"Yine ku\\u015fkuya d\\xfc\\u015fecek kadar \\xfc\\u015f\\xfcyorum.\"- Franz Kafka', 1)\n", | |
| "(u'RT @miabuelasabia: S\\xe9 feliz con lo que tienes en tus brazos, porque un d\\xeda puedes cargar el vac\\xedo\\n(Franz Kafka)', 1)\n", | |
| "(u'RT @EXASOLAG: Performance comparison between #EXASOL & #Oracle In-Memory Option - https://t.co/rpu6F37H0S (in Russian) #bigdata #analytics\\u2026', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:28:50\n", | |
| "-------------------------------------------\n", | |
| "(u'OscaryMadriz', 1)\n", | |
| "(u'aylinyigit6', 1)\n", | |
| "(u'EXAGolo', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:00\n", | |
| "-------------------------------------------\n", | |
| "(u'@Snoopy not move to Canada.', 1)\n", | |
| "(u'_ https://t.co/V9KoFbSQ5G #Samsung #Oracle #TipsAndTricks #HTML5 9413', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:00\n", | |
| "-------------------------------------------\n", | |
| "(u'eddie8374', 1)\n", | |
| "(u'cn1bot3', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:10\n", | |
| "-------------------------------------------\n", | |
| "(u'RT @OUPAcademic: The role of family values in the 2016 presidential election https://t.co/AYoizr7poo Via @OUPReligion #Election2016', 1)\n", | |
| "(u'NEED ONLY USC AND GC :- Oracle EBS Finance Developer__Phoenix, AZ https://t.co/XSlvp7Ny2N', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:10\n", | |
| "-------------------------------------------\n", | |
| "(u'academicpresses', 1)\n", | |
| "(u'hireitpeople', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:20\n", | |
| "-------------------------------------------\n", | |
| "(u'VIDEO: Paul Ryan Stops Lawmaker\\u2019s Son from \\u2018Dabbing\\u2019 During Swearing-In Ceremony https://t.co/DC9gebMymB #tcot\\u2026 https://t.co/MAJZ6jDSnT', 1)\n", | |
| "(u'.@GOP @SpeakerRyan @SenateMajLdr @SenateGOP @HouseGOP\\n#Election2016 Mandate\\n#ElectoralCollege Landslide\\u2026 https://t.co/dRx5jJKBIc', 1)\n", | |
| "(u'Lalu salibis, zionis dan Syiah Rafidhah gak bahaya gitu? Jd Erdogan boleh mesra dgn mrk? https://t.co/ACWOUTA1kC', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:20\n", | |
| "-------------------------------------------\n", | |
| "(u'Makibao008', 1)\n", | |
| "(u'TrumpCard555', 1)\n", | |
| "(u'cafenetamerica', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:30\n", | |
| "-------------------------------------------\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:30\n", | |
| "-------------------------------------------\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:40\n", | |
| "-------------------------------------------\n", | |
| "(u'RT @orclapexblogs: Oracle ACE Alumnus - so long, and thanks for all the fish https://t.co/vmHTphn2Z4 #OrclAPEX #odtug', 1)\n", | |
| "(u'IM NOT READY FOR BUNKASAI, UWAA NISA KUDU OTTOKHAE? >.<', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:40\n", | |
| "-------------------------------------------\n", | |
| "(u'lbrizzi', 1)\n", | |
| "(u'annisahael', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:50\n", | |
| "-------------------------------------------\n", | |
| "(u'#empleo #IT Analista T\\xe9cnico Oracle - Madrid https://t.co/tmSgkAKO0d', 1)\n", | |
| "(u'RT @booknerdfession: \\u201cMemories warm you up from the inside. But they also tear you apart.\\u201d \\n\\u2015 Haruki Murakami, Kafka on the Shore', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:29:50\n", | |
| "-------------------------------------------\n", | |
| "(u'MalikAqsa', 1)\n", | |
| "(u'IJInformatico', 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:30:00\n", | |
| "-------------------------------------------\n", | |
| "(u'.@Harlan #Election2016 left US with the same unaccountable dysfunctional party duopoly. Join me to invoke reform. #iamameriCAN #FirstCitizen', 1)\n", | |
| "(u'RT @bendawla: kafka demi\\u015f ki ah milena sen ba\\u015fkayd\\u0131n hasta bir adam\\u0131 sevecek kadar hastayd\\u0131n milena da demi\\u015f ne hastas\\u0131 ya makyaj yapmad\\u0131m\\u2026', 1)\n", | |
| "(u\"@FiiDee_ mtu hawezi kudu any. I'm here\\U0001f606\", 1)\n", | |
| "\n", | |
| "-------------------------------------------\n", | |
| "Time: 2017-01-04 14:30:00\n", | |
| "-------------------------------------------\n", | |
| "(u'Ecenurztrk', 1)\n", | |
| "(u'mejar412', 1)\n", | |
| "(u'1stcitizen', 1)\n", | |
| "\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "ssc.start()\n", | |
| "ssc.awaitTermination()" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 2", | |
| "language": "python", | |
| "name": "python2" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 2 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython2", | |
| "version": "2.7.12" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 1 | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Simple example of processing twitter JSON payload from a Kafka stream with Spark Streaming in Python | |
| # @rmoff December 21, 2016 | |
| # | |
| # Based on direct_kafka_wordcount.py | |
| # (https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py) | |
| # ------ | |
| # Can be run under Docker using https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook | |
| # ------ | |
| # Launch syntax: | |
| # /usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \ | |
| # --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 \ | |
| # kafka_twitter_sparkstreaming.py | |
| # Import dependencies | |
| # Print to stdout | |
| from __future__ import print_function | |
| # Spark | |
| from pyspark import SparkContext | |
| # Spark Streaming | |
| from pyspark.streaming import StreamingContext | |
| # Kafka | |
| from pyspark.streaming.kafka import KafkaUtils | |
| # json parsing | |
| import json | |
| # Create Spark context | |
| sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountRM") | |
| # Create Streaming context, with a batch duration of 10 seconds | |
| # ref: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext | |
| # ref: http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext | |
| ssc = StreamingContext(sc, 10) | |
| # Connect to Kafka, topic 'twitter', consumer group 'spark-streaming' | |
| # ref: http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html | |
| kafkaStream = KafkaUtils.createStream(ssc, 'kafka-zk:2181', 'spark-streaming', {'twitter':1}) | |
| # Parse the inbound message as json | |
| parsed = kafkaStream.map(lambda v: json.loads(v[1])) | |
| # Count the number of instance of each tweet text | |
| text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\ | |
| reduceByKey(lambda x,y: x + y) | |
| # Print the text counts (first ten shown) | |
| text_counts.pprint() | |
| # Count the number of tweets per author | |
| author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\ | |
| reduceByKey(lambda x,y: x + y) | |
| # Print the author tweet counts (first ten shown) | |
| author_counts.pprint() | |
| # Start the streaming context | |
| ssc.start() | |
| ssc.awaitTermination() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 17/01/04 14:08:10 INFO DAGScheduler: ResultStage 42 (runJob at PythonRDD.scala:441) finished in 0.122 s | |
| 17/01/04 14:08:10 INFO DAGScheduler: Job 21 finished: runJob at PythonRDD.scala:441, took 0.129443 s | |
| ------------------------------------------- | |
| Time: 2017-01-04 14:08:10 | |
| ------------------------------------------- | |
| ('_ https://t.co/gn0B7jxDYj #Samsung #Oracle #Smartphone #Epic 427', 1) | |
| ('4.9 Rating! #TopRated #Bestseller \n\nTHE LAST ORACLE! \n\n"The masterpiece of the trilogy!" "Epic romance!"\n\nhttps://t.co/MR8i95m9Sf', 1) | |
| ('RT @shadihamid: Semi-serious question: do we have any actual reports of people moving to Canada b/c of Trump? (getting accepted into McGill…', 1) | |
| ('RT @AdedapoLinda: #CROSSCHECK purged SOCIAL Security-Medicare-1st amend @docrocktex26 @sarahkendzior @Ireland0828 @sumoh7 @Deoliver47… ', 1) | |
| ('@rolandscahill @realDonaldTrump all she has to do is make good on her word and move to Canada.', 1) | |
| 17/01/04 14:08:10 INFO JobScheduler: Finished job streaming job 1483538890000 ms.0 from job set of time 1483538890000 ms | |
| 17/01/04 14:08:10 INFO JobScheduler: Starting job streaming job 1483538890000 ms.1 from job set of time 1483538890000 ms | |
| 17/01/04 14:08:10 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441 | |
| 17/01/04 14:08:10 INFO DAGScheduler: Registering RDD 59 (call at /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py:2230) | |
| 17/01/04 14:08:10 INFO DAGScheduler: Got job 22 (runJob at PythonRDD.scala:441) with 1 output partitions | |
| [...] | |
| 17/01/04 14:08:30 INFO DAGScheduler: ResultStage 66 (runJob at PythonRDD.scala:441) finished in 0.102 s | |
| 17/01/04 14:08:30 INFO DAGScheduler: Job 33 finished: runJob at PythonRDD.scala:441, took 0.109977 s | |
| ------------------------------------------- | |
| Time: 2017-01-04 14:08:30 | |
| ------------------------------------------- | |
| ('Oracle DBA https://t.co/x1Xp5j2ISn', 1) | |
| ('DBA Oracle https://t.co/QoTr3XBUrJ', 1) | |
| 17/01/04 14:08:30 INFO JobScheduler: Finished job streaming job 1483538910000 ms.0 from job set of time 1483538910000 ms |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
showing me that " no kson obkect coukd be decoded"