cd $SPARK_HOME
/bin/spark-submit --packages TargetHolding/pyspark-cassandra:0.3.5 /Users/drehman/Apps/workspace/spark_cassandra_stream_example.py
python twitter_rolling_count.py -q data -d data 2>&1 | nc -lk 10.0.0.235 9999
Last active
July 19, 2017 19:49
-
-
Save danish-rehman/d05a85e3d325e78e8b25aa7ee30512ce to your computer and use it in GitHub Desktop.
Spark Cassandra Live Tweet Example 1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark_cassandra | |
from pyspark.streaming import StreamingContext | |
from pyspark import SparkConf, SparkContext | |
from pyspark_cassandra import streaming | |
keyspace, table = "mykeyspace", "tweets" | |
conf = SparkConf() \ | |
.setAppName("PySpark Cassandra Stream") \ | |
.setMaster("spark://SANM-MBP01L.local:7077") \ | |
.set("spark.cassandra.connection.host", "localhost") \ | |
.set("spark.eventLog.enabled", "true") \ | |
.set("spark.jars.packages", 'TargetHolding/pyspark-cassandra:0.3.5') | |
sc = pyspark_cassandra.CassandraSparkContext(conf=conf) | |
ssc = StreamingContext(sc, 2) | |
log4j = sc._jvm.org.apache.log4j | |
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR) | |
def split_ip(line): | |
id_str, created_at = line.split() | |
return (created_at, 3,) | |
rdds = ssc \ | |
.socketTextStream("10.0.0.235", 9999) \ | |
.map(split_ip) | |
rdds.pprint() | |
rdds.saveToCassandra(keyspace, table) | |
ssc.start() | |
ssc.awaitTerminationOrTimeout(100) | |
ssc.stop() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# To run this code, first edit config.py with your configuration, then: | |
# | |
# mkdir data | |
# python twitter_stream_download.py -q apple -d data | |
# | |
# It will produce the list of tweets for the query "apple" | |
# in the file data/stream_apple.json | |
import tweepy, pdb | |
from tweepy import Stream | |
from tweepy import OAuthHandler | |
from tweepy.streaming import StreamListener | |
from datetime import datetime | |
import time | |
import argparse | |
import string | |
import config | |
import json | |
import pdb | |
import simplejson | |
from pprint import pprint | |
import random | |
from insert_twitter_cassandra import * | |
def get_parser(): | |
"""Get parser for command line arguments.""" | |
parser = argparse.ArgumentParser(description="Twitter Downloader") | |
parser.add_argument("-q", | |
"--query", | |
dest="query", | |
help="Query/Filter", | |
default='-') | |
parser.add_argument("-d", | |
"--data-dir", | |
dest="data_dir", | |
help="Output/Data Directory") | |
return parser | |
class MyListener(StreamListener): | |
"""Custom StreamListener for streaming data.""" | |
def __init__(self, data_dir, query): | |
query_fname = format_filename(query) | |
self.outfile = "%s/stream_%s.json" % (data_dir, query_fname) | |
def to_format(self, data): | |
data['created_at'] = int(datetime.fromtimestamp(int(data['timestamp_ms'])/1000).strftime('%Y%m%d%H%M')) | |
def random_string(self): | |
nouns = ("puppy", "car", "rabbit", "girl", "monkey") | |
verbs = ("runs", "hits", "jumps", "drives", "barfs") | |
adv = ("crazily.", "dutifully.", "foolishly.", "merrily.", "occasionally.") | |
adj = ("adorable", "clueless", "dirty", "odd", "stupid") | |
for i in range(10): | |
num = random.randrange(0,5) | |
print nouns[num] + ' ' + verbs[num] + ' ' + adv[num] + ' ' + adj[num] | |
def on_data(self, data): | |
try: | |
with open(self.outfile, 'a') as f: | |
data = simplejson.loads(data) | |
self.to_format(data) | |
print_cursor(data) | |
return True | |
except BaseException as e: | |
print("Error on_data: %s" % str(e)) | |
time.sleep(5) | |
return True | |
def on_error(self, status): | |
print(status) | |
return True | |
def format_filename(fname): | |
"""Convert file name into a safe string. | |
Arguments: | |
fname -- the file name to convert | |
Return: | |
String -- converted file name | |
""" | |
return ''.join(convert_valid(one_char) for one_char in fname) | |
def convert_valid(one_char): | |
"""Convert a character into '_' if invalid. | |
Arguments: | |
one_char -- the char to convert | |
Return: | |
Character -- converted char | |
""" | |
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits) | |
if one_char in valid_chars: | |
return one_char | |
else: | |
return '_' | |
@classmethod | |
def parse(cls, api, raw): | |
status = cls.first_parse(api, raw) | |
setattr(status, 'json', json.dumps(raw)) | |
return status | |
if __name__ == '__main__': | |
parser = get_parser() | |
args = parser.parse_args() | |
auth = OAuthHandler(config.consumer_key, config.consumer_secret) | |
auth.set_access_token(config.access_token, config.access_secret) | |
api = tweepy.API(auth) | |
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query)) | |
twitter_stream.filter(track=[args.query]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment