Philip Thrasher
Web hacker at a private cyber security company.
- Build webapps with Django
- Build API's with Tornado (or django)
- Deal with very large data sets.
- Create interfaces used to interact with both real-time data, real-time statistics, and large static datasets.
- Python
- Django
- Tornado
- MySQL
- Rabbit MQ
- Hadoop
- Javascript
- How to manage real-time data simply using python and redis.
- How to show real-time data to users via websockets (using socket.io).
Don't worry... I'll be using socket.io with python (using Tornado), instead of node on the server side.
- It never stops coming.
- Some sources only allow one consumer connection (twitter).
- Potentially LOTS of data.
- Hard to store.
- Hard to show in a meaningful way to the user depending on the dataset.
- Many data sources post single items (or multiple in batch) onto a queue that another process consumes. There could be multiple queues depending on the data, multiple consumers, etc.
- The process that reads from the queue looks at the raw data, possibly "Hydrating" the data, by adding more meaningful data points. This process can be parallelized easily since it just processes incoming messages. Usually order is maintained by timestamps on cleaned data.
- Post processing is done, and actions are taken out on the events. Usually this means placing the cleaned data on new queues for a client process to consume.
- Consume tweets from twitter in real time.
- See how to process each message as it comes in, in real time.
- Handle the one connection problem, and potential backups using redis pub/sub.
- Display this data in ways that may be useful to the user.
Python modules required:
- tweepy
- redis
import tweepy
import redis
from twitter_credentials import *
class Listener(tweepy.StreamListener):
def on_data(self, data):
if data: # don't worry about blank lines
redis.publish('twitter_raw', data)
if __name__ == '__main__':
try:
"""
The following need to be specified in './twitter_credentials.py'
CONSUMER_KEY
CONSUMER_SECRET
ACCESS_TOKEN
ACCESS_TOKEN_SECRET
"""
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
redis = redis.StrictRedis(host='localhost', port=6379, db=0)
streaming_api = tweepy.streaming.Stream(auth, Listener(), timeout=60)
# Geo box that covers the entire earth to ensure we only get geo-located tweets
streaming_api.filter(locations=[ -180,-90,180,90 ])
except KeyboardInterrupt:
print "\nBye!"
- Authenticates with Twitter's streaming API using the tweepy Module.
- Asks for all tweets in real-time that are geo-located here on earth. ;-)
- Publishes all valid tweets to the 'twitter_raw' channel on the local redis server.
Not very exciting. But now we have one connection to twitter (twitter only allows one at a time for the streaming api) and we're publishing to a queue that someone else can consume from.
- Connect to Redis
- Subscribe to the channel.
- Print each tweet as it comes in, to stdout.
import redis
import json
import datetime
def log_tweet(username, name, tweet_body):
now = datetime.datetime.now()
print "[%s] @%s (%s): %s" % (now, username, name, tweet_body,)
def consume_subscription(tweets):
while True:
message = tweets.next()
raw_tweet = message.get('data')
if not raw_tweet:
continue
tweet = json.loads(raw_tweet)
# Sometimes we just get back spurious data, like integers and stuff
if not isinstance(tweet, dict):
continue
# Data extraction
tweet_body = tweet.get('text')
user = tweet.get('user', {})
username = user.get('screen_name')
name = user.get('name')
# Data presentation
if tweet_body and name and username:
log_tweet(username, name, tweet_body)
if __name__ == '__main__':
try:
redis = redis.StrictRedis(host='localhost', port=6379, db=0)
pubsub = redis.pubsub()
pubsub.subscribe('twitter_raw')
tweets = pubsub.listen()
consume_subscription(tweets)
except KeyboardInterrupt:
print "\nBye!"
Python modules needed:
- tornado
- tornadio2
- brukva - tornado compatible redis client.
What we'll do
- Flesh out subscriber from previous step to allow for generic hydration functions. (instead of just extracting the user, name, and tweet text.)
- Write Tornado App to push these results to the client.
import redis
import json
import datetime
import hydrators
import events
HYDRATORS = [
hydrators.user_data,
hydrators.body,
]
EVENTS = {
"simple_tweet": events.simple_tweet,
}
def consume_raw_subscription(tweets):
"""
This method consumes the raw tweets, parses them, hands them to each
hydrator, the hydrator cleans the data, then we run through a list of event
handlers, each of which will publish to a redis queue if it has the data it
needs.
"""
while True:
message = tweets.next()
raw_tweet = message.get('data')
if not raw_tweet:
continue
try:
tweet = json.loads(raw_tweet)
except ValueError, e:
continue
# Sometimes we just get back spurious data, like integers and stuff
if not isinstance(tweet, dict):
continue
# Data extraction
clean_obj = {}
for hydrator in HYDRATORS:
# Allow the method to overwrite our clean data
hydrated = hydrator(tweet, clean_obj)
if hydrated:
clean_obj.update(hydrated)
for kind, fn in EVENTS.items():
# Publish events
data = fn(clean_obj)
json_data = json.dumps(data)
event = json.dumps({
'kind': kind,
'body': json_data,
})
redis.publish('events', event)
if __name__ == '__main__':
try:
redis = redis.StrictRedis(host='localhost', port=6379, db=0)
pubsub = redis.pubsub()
pubsub.subscribe('twitter_raw')
tweets = pubsub.listen()
consume_raw_subscription(tweets)
except KeyboardInterrupt:
print "\nBye!"
def body(tweet, clean_obj):
"""
Simpy grabs the tweet body.
"""
tweet_body = tweet.get('text')
if tweet_body:
clean_obj.update({
"body": tweet_body,
})
return clean_obj
def user_data(tweet, clean_obj):
"""
Simply grabs username, and name.
"""
user = tweet.get('user', {})
username = user.get('screen_name')
name = user.get('name')
if username and name:
clean_obj.update({
"username": username,
"name": name,
})
return clean_obj
def simple_tweet(data):
return {
'username': data.get('username', ''),
'name': data.get('name', ''),
'body': data.get('body', ''),
}
# Create tornadio router
WSRouter = TornadioRouter(WebSocketHandler,
dict(enabled_protocols=['websocket', 'xhr-polling',
'jsonp-polling', 'htmlfile']))
# Create socket application
application = web.Application(
WSRouter.apply_routes([(r"/step-(\d)+[/]?", StepPageHandler)]),
flash_policy_port = 843,
flash_policy_file = os.path.join(ROOT, 'flashpolicy.xml'),
template_path = os.path.join(ROOT, 'templates'),
static_path = os.path.join(ROOT, 'static'),
socket_io_port = 8001,
enable_pretty_logging = True
)
if __name__ == '__main__':
socketio_server = SocketServer(application, auto_start=False)
STATS = ServerStats()
redis = brukva.Client(host='localhost', port=6379, selected_db=0)
redis.connect()
redis.subscribe('events')
redis.listen(broadcast_events)
ioloop.IOLoop.instance().start()
class StepPageHandler(web.RequestHandler):
"""
Renders each step page for the talk.
"""
def get(self, step):
self.render('step-%s.html' % step)
class WebSocketHandler(SocketConnection):
"""
Manages the global list of subscribers.
"""
def on_open(self, *args, **kwargs):
SUBSCRIBERS.append(self)
def on_close(self):
if self in SUBSCRIBERS:
SUBSCRIBERS.remove(self)
def broadcast_events(event):
"""
Generic event broadcaster. We subscribe to the 'events' redis queue, and
anytime we get anything new from there, we emit that event to all of our
http clients. This method is a brukva callback.
"""
# Get the message body from the redis message.
raw_event = event.body
if not raw_event:
# If there was no body to be had, let's just return.
STATS.log("Recieved empty event body.", level="warn")
return
try:
# Let's try to parse the JSON obj.
event = json.loads(raw_event)
except ValueError, e:
# Ruh roh raggy... Wouldn't parse. Let's log it.
STATS.log("Couldn't parse JSON object: %s" % str(raw_event), level="warn")
return
# Let's ensure this is a properly formed event (eg. kind, and json body.)
kind = event.get('kind')
body = event.get('body')
if not kind or not body:
STATS.log("Not a proper event: %s" % event, level="warn")
return
for subscriber in SUBSCRIBERS:
STATS.incr('sent_messages', 1)
subscriber.emit(kind, body)
STATS.incr(kind, 1)
- consumer.py is connected to the twitter streaming API, and pushing all tweets to the raw_tweets redis channel
- enrich.py is taking all tweets from raw_tweets in redis, and applying any configured hydration to it, then publishing any events that need to be derived from this hydrated data to the events channel in redis. (currently only the 'simple_tweet' event.)
- webapp.py is a tornado app that is consuming from the events channel in redis, and emitting a socket.io event to the browser based on this.
var host = window.location.hostname,
port = 8001,
socket = io.connect(host, port);
// watching for the `simple_tweet` event.
socket.on('simple_tweet', function(tweet) {
renderTweet(JSON.parse(tweet));
});
function renderTweet(tweet) {
// ...
}
All data sent in real-time via an open connection to the browser... Really cool.
Not cool enough though. There's a lot of data in the average tweet, including geo-data. Let's draw this on a map.
We'll need to add:
- A new hydrator that extracts geo-coords from each tweet
- A new event called "geo_blip" that the webapp will subscribe to.
- Add map and blip code to client.
def lat_lon(tweet, clean_obj):
"""
Searches for the lat and lon and returns it.
"""
geo = tweet.get('geo')
if geo:
geo_type = geo.get('type')
if geo_type.lower() != 'point':
return None
lat, lon = geo.get('coordinates')
else:
place = tweet.get('place')
if not place:
return None
bounding_box = place.get('bounding_box')
if not bounding_box:
return None
coords = bounding_box.get('coordinates')
if not coords:
return None
lat, lon = coords[0][0]
if lat and lon:
clean_obj.update({
"lat": lat,
"lon": lon,
})
return clean_obj
def simple_tweet(data):
return {
'username': data.get('username', ''),
'name': data.get('name', ''),
'body': data.get('body', ''),
}
def geo_blip(data):
return {
'lat': data.get('lat', 0),
'lon': data.get('lon', 0),
}
def simple_tweet(data):
return {
'username': data.get('username', ''),
'name': data.get('name', ''),
'body': data.get('body', ''),
}
def geo_blip(data):
return {
'lat': data.get('lat', 0),
'lon': data.get('lon', 0),
}
# ...
socket.on('geo_blip', function(geo) {
drawBlip(JSON.parse(geo));
}
function drawBlip(geo) {
// ...
}
- Consumed events from twitter's feed.
- Enriched them in real-time.
- Created events from the data that the browser can subscribe to.
- Display this information in a friendly way on the client.
- A lot more JS/CSS/HTML used for each page.
- Some stats counting methods in python and js.
You can see this stuff in the public git-repo:
https://github.com/pthrasher/twitter-python-talk
Socket.IO: http://socket.io/ TornadoWeb: http://tornadoweb.org Redis: redis.io