Skip to content

Instantly share code, notes, and snippets.

@danish-rehman
Last active July 19, 2017 19:49
Show Gist options
  • Save danish-rehman/d05a85e3d325e78e8b25aa7ee30512ce to your computer and use it in GitHub Desktop.
Save danish-rehman/d05a85e3d325e78e8b25aa7ee30512ce to your computer and use it in GitHub Desktop.
Spark Cassandra Live Tweet Example 1
import pyspark_cassandra
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
from pyspark_cassandra import streaming
keyspace, table = "mykeyspace", "tweets"
conf = SparkConf() \
.setAppName("PySpark Cassandra Stream") \
.setMaster("spark://SANM-MBP01L.local:7077") \
.set("spark.cassandra.connection.host", "localhost") \
.set("spark.eventLog.enabled", "true") \
.set("spark.jars.packages", 'TargetHolding/pyspark-cassandra:0.3.5')
sc = pyspark_cassandra.CassandraSparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
def split_ip(line):
id_str, created_at = line.split()
return (created_at, 3,)
rdds = ssc \
.socketTextStream("10.0.0.235", 9999) \
.map(split_ip)
rdds.pprint()
rdds.saveToCassandra(keyspace, table)
ssc.start()
ssc.awaitTerminationOrTimeout(100)
ssc.stop()

Count stats for twitter stream and store in Cassandra

cd $SPARK_HOME

/bin/spark-submit --packages TargetHolding/pyspark-cassandra:0.3.5 /Users/drehman/Apps/workspace/spark_cassandra_stream_example.py

python twitter_rolling_count.py -q data -d data 2>&1 | nc -lk 10.0.0.235 9999
# To run this code, first edit config.py with your configuration, then:
#
# mkdir data
# python twitter_stream_download.py -q apple -d data
#
# It will produce the list of tweets for the query "apple"
# in the file data/stream_apple.json
import tweepy, pdb
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from datetime import datetime
import time
import argparse
import string
import config
import json
import pdb
import simplejson
from pprint import pprint
import random
from insert_twitter_cassandra import *
def get_parser():
"""Get parser for command line arguments."""
parser = argparse.ArgumentParser(description="Twitter Downloader")
parser.add_argument("-q",
"--query",
dest="query",
help="Query/Filter",
default='-')
parser.add_argument("-d",
"--data-dir",
dest="data_dir",
help="Output/Data Directory")
return parser
class MyListener(StreamListener):
"""Custom StreamListener for streaming data."""
def __init__(self, data_dir, query):
query_fname = format_filename(query)
self.outfile = "%s/stream_%s.json" % (data_dir, query_fname)
def to_format(self, data):
data['created_at'] = int(datetime.fromtimestamp(int(data['timestamp_ms'])/1000).strftime('%Y%m%d%H%M'))
def random_string(self):
nouns = ("puppy", "car", "rabbit", "girl", "monkey")
verbs = ("runs", "hits", "jumps", "drives", "barfs")
adv = ("crazily.", "dutifully.", "foolishly.", "merrily.", "occasionally.")
adj = ("adorable", "clueless", "dirty", "odd", "stupid")
for i in range(10):
num = random.randrange(0,5)
print nouns[num] + ' ' + verbs[num] + ' ' + adv[num] + ' ' + adj[num]
def on_data(self, data):
try:
with open(self.outfile, 'a') as f:
data = simplejson.loads(data)
self.to_format(data)
print_cursor(data)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
time.sleep(5)
return True
def on_error(self, status):
print(status)
return True
def format_filename(fname):
"""Convert file name into a safe string.
Arguments:
fname -- the file name to convert
Return:
String -- converted file name
"""
return ''.join(convert_valid(one_char) for one_char in fname)
def convert_valid(one_char):
"""Convert a character into '_' if invalid.
Arguments:
one_char -- the char to convert
Return:
Character -- converted char
"""
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return '_'
@classmethod
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, 'json', json.dumps(raw))
return status
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query))
twitter_stream.filter(track=[args.query])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment