Skip to content

Instantly share code, notes, and snippets.

@danish-rehman
Last active July 14, 2016 06:30
Show Gist options
  • Save danish-rehman/ada42843f7d13e573426ab48341a4dba to your computer and use it in GitHub Desktop.
Save danish-rehman/ada42843f7d13e573426ab48341a4dba to your computer and use it in GitHub Desktop.
Data rollups guidelines
import pytz
import random
import datetime
import pyspark_cassandra
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
from pyspark_cassandra import streaming
from operator import add
keyspace, min_table, hour_table = "mykeyspace", "rollups_min", "rollups_hour"
conf = SparkConf() \
.setAppName("Cision realtime stats") \
.setMaster("spark://SANM-MBP01L.local:7077") \
.set("spark.cassandra.connection.host", "localhost") \
.set("spark.eventLog.enabled", "true") \
.set("spark.jars.packages", 'TargetHolding/pyspark-cassandra:0.3.5')
sc = pyspark_cassandra.CassandraSparkContext(conf=conf)
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
def add_tuples(a, b):
return (a[0] + b[0], a[1],)
def transform_arrange_row(data):
row_key = data[0]
row_key = row_key[:-2] + "0000"
metric = data[1][0]
datetime_min = data[1][1]
#datetime_min = datetime_min.replace(tzinfo = pytz.utc)
return (row_key, datetime_min, metric)
def map_val(row):
dt = row['time']
#dt = dt.replace(tzinfo = pytz.utc)
dt = dt.replace(hour=dt.hour, minute=dt.minute, second=0, microsecond=0)
#d_truncated = dt.replace(hour=dt.hour, minute=dt.minute, second=0, microsecond=0, tzinfo = pytz.utc)
return (row['event_min'], (row['value'], dt,),)
rdds = sc \
.cassandraTable(keyspace, min_table) \
.map(map_val) \
.reduceByKey(add_tuples) \
.map(transform_arrange_row)
print rdds.collect()
rdds.saveToCassandra(keyspace, hour_table)
import pytz
import random
import pyspark_cassandra
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
from pyspark_cassandra import streaming
from datetime import datetime
from operator import add
keyspace, table = "mykeyspace", "rollups_min"
conf = SparkConf() \
.setAppName("Cision realtime stats") \
.setMaster("spark://SANM-MBP01L.local:7077") \
.set("spark.cassandra.connection.host", "localhost") \
.set("spark.eventLog.enabled", "true") \
.set("spark.jars.packages", 'TargetHolding/pyspark-cassandra:0.3.5')
sc = pyspark_cassandra.CassandraSparkContext(conf=conf)
ssc = StreamingContext(sc, 20)
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
def split_ip(line):
id_str, created_at = line.split()
d = datetime.utcfromtimestamp(int(created_at))
d = d.replace(tzinfo = pytz.utc)
row_key = 'twitter-111-%s' % d.strftime('%Y%m%d%H%M')
return (created_at, (1, row_key))
def add_tuples(a, b):
return (a[0] + b[0], a[1],)
def make_row(data):
row_key = data[1][1]
metric = data[1][0]
d = datetime.utcfromtimestamp(int(data[0]))
d = d.replace(tzinfo = pytz.utc)
row_key = 'twitter-000-%s' % d.strftime('%Y%m%d%H%M')
return (row_key, d, metric)
rdds = ssc \
.socketTextStream("10.0.0.235", 9999) \
.map(split_ip) \
.reduceByKey(add_tuples) \
.map(make_row)
rdds.pprint()
rdds.saveToCassandra(keyspace, table)
ssc.start()
ssc.awaitTerminationOrTimeout(100)
ssc.stop()

Rolling count of stats using pyspark

CQL Query

CREATE TABLE rollups_min (       
      event_min text,       
      time timestamp,       
      value int,       
      PRIMARY KEY (event_min, time)     
) WITH COMPACT STORAGE;

CREATE TABLE rollups_hour (       
      event_hour text,       
      time timestamp,       
      value int,       
      PRIMARY KEY (event_hour, time)     
) WITH COMPACT STORAGE;

Start the twitter stream

python twitter_stream.py -q data -d /tmp 2>&1 | nc -lk 10.0.0.235 9999

Spark processing for hourly data

/bin/spark-submit --packages TargetHolding/pyspark-cassandra:0.3.5 /Users/drehman/Apps/workspace/atlas/spark_processor_hour.py

Spark processing for minutes data

/bin/spark-submit --packages TargetHolding/pyspark-cassandra:0.3.5 /Users/drehman/Apps/workspace/atlas/spark_processor_min.py

rollups_min

event_min 20160702 110001 20160702 110002 20160702 110004
twiiter-1234-20160702-110000 10 10 500
event_min 20160702 110101 20160702 110103 20160702 110104
twiiter-1234-20160702-110100 11 15 600

rollups_hour

event_hour 20160702 110100 20160702 110200 20160702 110300
twiiter-1234-20160702-110000 10 10 500
event_hour 20160702 120100 20160702 120200 20160702 120300
twiiter-1234-20160702-120000 10 10 500

rollups_day

event_day 20160702 110000 20160702 120000 20160702 130000
twiiter-1234-20160702-000000 10 10 500
event_day 20160703 110000 20160703 120000 20160703 130000
twiiter-1234-20160703-000000 10 10 500

References

  1. CQL3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment