Skip to content

Instantly share code, notes, and snippets.

@questsin
Created February 20, 2019 16:52
Show Gist options
  • Save questsin/83bf4be7fbea92a42dc450b45d1fc5a0 to your computer and use it in GitHub Desktop.
Save questsin/83bf4be7fbea92a42dc450b45d1fc5a0 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")
kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination(timeout=180)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment