Last active
February 26, 2018 21:13
-
-
Save sblack4/589ddf15796e6f7491fdee50bee55520 to your computer and use it in GitHub Desktop.
Spark-twitter stuff for POC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
config.py | |
.vscode | |
*.pyc | |
*.jar |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
from __future__ import print_function | |
# general packages | |
import sys | |
import config | |
import logging | |
import json | |
from random import choice | |
# pyspark streaming | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.kafka import KafkaUtils | |
# spark sql | |
from pyspark.sql import HiveContext | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import desc | |
from pyspark.sql.functions import col | |
from pyspark.sql.functions import expr | |
# tweet_struct = StructType([ | |
# StructField('id', LongType(), False), | |
# StructField('created_at', StringType(), False), | |
# StructField('text', StringType(), False), | |
# StructField('favorite_count', IntegerType(), True), | |
# StructField('quote_count', IntegerType(), True), | |
# StructField('retweet_count', IntegerType(), True), | |
# StructField('reply_count', IntegerType(), True), | |
# StructField('lang', StringType(), True), | |
# StructField('coordinates', StringType(), True), | |
# StructField('place', StringType(), True), | |
# StructField('possibly_sensitive', StringType(), True), | |
# StructField('user', MapType(StringType(), StringType()), False) | |
# ]) | |
def HandleJson(df): | |
# fill na | |
# check for possibly_sensitive | |
# get rid of sensitive material | |
if df.select("possibly_sensitive").show() == "true": | |
return | |
tweets = df.select("id", | |
"created_at", | |
expr('COALESCE(text, "null") AS text'), | |
expr('COALESCE(favorite_count, 0) AS favorite_count'), | |
expr('COALESCE(retweet_count, 0) AS retweet_count'), | |
expr('COALESCE(quote_count, 0) AS quote_count'), | |
expr('COALESCE(reply_count, 0) as reply_count'), | |
expr('COALESCE(lang, "und") as lang'), | |
expr('COALESCE(coordinates, 0) as coordinates'), | |
expr('COALESCE(place, "null") as place'), | |
col("user.id").alias("user_id"), | |
expr("good_day() as date"), | |
expr("rand_state() as state"), | |
expr("rand_provider() as provider") | |
# expr('concat("2018-02-", substring(created_at, 9, 2), "T", substring(created_at,12,8), ".000") as datetime') | |
) | |
tweets.write.mode("append").insertInto("default.tweets") | |
users = df.select("user.id", | |
"user.name", | |
"user.description", | |
"user.followers_count", | |
"user.location", | |
"user.friends_count", | |
"user.screen_name" | |
) | |
users.write.mode("append").insertInto("default.users") | |
def handleRDD(rdd): | |
if not rdd: | |
return | |
try: | |
# df=sqlContext.createDataFrame(json.loads(rdd.map(lambda x: x[1].encode('utf-8'))), samplingRatio=0.5) | |
df=sqlContext.read.json(rdd.map(lambda x: x[1])) | |
HandleJson(df) | |
except Exception as ex: | |
print(ex) | |
if __name__ == "__main__": | |
sc = SparkContext("yarn", "TweetConsumer") | |
ssc = StreamingContext(sc, 1) | |
sqlContext = HiveContext(sc) | |
# ssc.checkpoint("file:///" + getcwd()) | |
days = sqlContext.sql("select distinct date from transactions").collect() | |
def good_day(): | |
return choice(days)[0].encode('utf-8') | |
sqlContext.udf.register("good_day", good_day) | |
states = sqlContext.sql("select State from us_states").collect() | |
def rand_state(): | |
return choice(states)[0].encode('utf-8') | |
sqlContext.udf.register("rand_state", rand_state) | |
def rand_provider(): | |
return choice(['pandora', 'spotify']) | |
sqlContext.udf.register("rand_provider", rand_provider) | |
zkQuorum, topic = config.zkQuorum, config.topic | |
# lines = KafkaUtils.createStream(ssc, [zkQuorum], "", [topic] | |
lines = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": zkQuorum}) | |
lines.foreachRDD(handleRDD) | |
ssc.start() | |
ssc.awaitTermination() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# standard libraries | |
from __future__ import print_function | |
import config | |
import json | |
import logging | |
# tweepy | |
import tweepy | |
from tweepy import Stream | |
from tweepy import OAuthHandler | |
from tweepy.streaming import StreamListener | |
# kafka-python libraries | |
from kafka import KafkaProducer | |
from kafka.errors import KafkaError | |
from any_broker import run_listener | |
class AnyListener(StreamListener): | |
"""Custom StreamListener for streaming data.""" | |
def __init__(self, callback): | |
self.callback = callback | |
# logging.basicConfig(filename='anylistener.log',level=config.log_level) | |
# logging.info("started logging") | |
def on_data(self, data): | |
try: | |
# msg = json.loads(data) | |
# logging.info(msg) | |
self.callback(data.encode('utf-8')) | |
return True | |
except BaseException as e: | |
# logging.error("Error on_data: %s" % str(e)) | |
print(e) | |
return True | |
def on_error(self, status): | |
# logging.error(status) | |
return True | |
def run_listener(callback): | |
auth = OAuthHandler(config.consumer_key, config.consumer_secret) | |
auth.set_access_token(config.access_token, config.access_secret) | |
api = tweepy.API(auth) | |
twitter_stream = Stream(auth, AnyListener(callback)) | |
twitter_stream.filter(track=config.track_string.split(" ")) | |
# logging.info("listening for " + config.track_string) | |
# producer.send(config.topic, rec) | |
def run_kafka_broker(): | |
# Kafka Stuff | |
# put your broker hostname:port in single quotes inside those bracketse | |
logging.basicConfig(filename='run_kafka_broker.log',level=config.log_level) | |
logging.info("logging to " + config.kafka_host) | |
mykafkaservers = [config.kafka_host] | |
producer = KafkaProducer( | |
bootstrap_servers=mykafkaservers, | |
value_serializer=lambda m: m.encode('utf-8'), | |
api_version=(0,10,1) | |
) | |
produce = lambda vals: producer.send(config.topic, vals) | |
run_listener(produce) | |
if __name__ == "__main__": | |
run_kafka_broker() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
from __future__ import print_function | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
# from pyspark.sql import SQLContext | |
from pyspark.sql.functions import desc | |
from pyspark.sql import HiveContext | |
from os import getcwd | |
import logging | |
from pyspark.sql.functions import col | |
sc = SparkContext("local[2]", "TweetConsumer") | |
ssc = StreamingContext(sc, 10) | |
sqlContext = HiveContext(sc) | |
# ssc.checkpoint("file:///" + getcwd()) | |
socket_stream = ssc.socketTextStream("127.0.0.1", 5555) | |
lines = socket_stream.window(20) | |
def HandleJson(df): | |
# fill na | |
# check for possibly_sensitive | |
# get rid of sensitive material | |
if df.select("possibly_sensitive").show() == "true": | |
return | |
tweets = df.select("id", | |
"created_at", | |
"text", | |
"favorite_count", | |
"retweet_count", | |
"quote_count", | |
"reply_count", | |
"lang", | |
"coordinates", | |
"place", | |
col("user.id").alias("user_id") | |
) | |
tweets.write.mode("append").insertInto("default.tweets") | |
users = df.select("user.id", | |
"user.name", | |
"user.description", | |
"user.followers_count", | |
"user.location", | |
"user.friends_count", | |
"user.screen_name" | |
) | |
users.write.mode("append").insertInto("default.users") | |
def handleRDD(rdd): | |
if not rdd: | |
return | |
df=sqlContext.read.json(rdd) | |
try: | |
HandleJson(df) | |
except Exception as ex: | |
print(ex) | |
lines.foreachRDD(handleRDD) | |
ssc.start() | |
ssc.awaitTermination() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
from __future__ import print_function | |
import tweepy | |
from tweepy import Stream | |
from tweepy import OAuthHandler | |
from tweepy.streaming import StreamListener | |
import time | |
import config | |
import json | |
import socket | |
import logging | |
class PortListener(StreamListener): | |
"""Custom StreamListener for streaming data.""" | |
def __init__(self, filename): | |
self.client_socket = filename | |
def on_data(self, data): | |
try: | |
# msg = json.loads(data) | |
# logging.debug(msg) | |
self.client_socket.send(data) | |
return True | |
except BaseException as e: | |
logging.error("Error on_data: %s" % str(e)) | |
return True | |
def on_error(self, status): | |
logging.error(status) | |
return True | |
def run(c_socket): | |
auth = OAuthHandler(config.consumer_key, config.consumer_secret) | |
auth.set_access_token(config.access_token, config.access_secret) | |
api = tweepy.API(auth) | |
twitter_stream = Stream(auth, PortListener(c_socket)) | |
twitter_stream.filter(track=config.track_string.split(" ")) | |
if __name__ == "__main__": | |
logging.basicConfig(filename="port_broker.log", | |
level=config.log_level) | |
s = socket.socket() # Create a socket object | |
host = "127.0.0.1" # Get local machine name | |
port = 5555 # Reserve a port for your service. | |
s.bind((host, port)) # Bind to the port | |
logging.info("Listening on port: %s" % str(port)) | |
s.listen(5) # Now wait for client connection. | |
c, addr = s.accept() # Establish connection with client. | |
logging.info("Received request from: " + str(addr)) | |
run(c) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment