Skip to content

Instantly share code, notes, and snippets.

@amiller
Created September 26, 2012 20:23
Show Gist options
  • Save amiller/3790351 to your computer and use it in GitHub Desktop.
Save amiller/3790351 to your computer and use it in GitHub Desktop.
import urllib, urllib2
import json
import base64
import time
import itertools
import string
def coroutine(func):
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
class config:
TWITTER = {
'user': 'dv_test_1',
'password': 'mrybRJnp'}
def twitter_producer(queries):
# Consumes all of its input before yielding
url = 'https://stream.twitter.com/1/statuses/filter.json'
queries = [q.lower() for q in queries]
quoted_queries = [urllib.quote(q) for q in queries]
query_post = 'track=' + ",".join(quoted_queries)
request = urllib2.Request(url, query_post)
auth = base64.b64encode('%s:%s' % (config.TWITTER['user'], config.TWITTER['password']))
request.add_header('Authorization', "basic %s" % auth)
for item in urllib2.urlopen(request):
try:
item = json.loads(item)
except json.JSONDecodeError: #for whatever reason json reading twitters json sometimes raises this
raise StopIteration
if 'text' in item and 'user' in item:
text = item['text'].lower()
user_id = item['user']['id_str']
timestamp = int(time.mktime(time.strptime(item['created_at'],'%a %b %d %H:%M:%S +0000 %Y')))
yield (user_id, timestamp, text)
def retrier(factory):
while True:
try:
for item in factory(): yield item
except StopIteration:
print 'Feed closed, waiting 10 seconds'
time.sleep(10)
except urllib2.HTTPError, e:
print 'HTTP Error, waiting 30 seconds'
time.sleep(30)
print 'retrying'
fruits = ['apple','banana','carrot']
def follow(data, outport):
for datum in data:
outport.send(datum)
# Broadcast a stream onto multiple targets
@coroutine
def broadcast(outports):
while True:
for outport in outports: outport.send((yield))
@coroutine
def printer(name=''):
while True:
line = (yield)
print name, line
@coroutine
def failover(outports):
for outport in outports:
try:
while True: outport.send((yield))
except StopIteration: continue
@coroutine
def limiter(n, outport):
for _ in xrange(n): outport.send((yield))
from itertools import imap, repeat, count
chain = itertools.chain.from_iterable
curry = lambda f, x: lambda y: f(x, y)
if __name__ == '__main__':
def round_robin(nworkers=5, chunk=10):
workers = map(printer, string.uppercase[:nworkers])
schedule = imap(curry(limiter, chunk), chain(repeat(workers)))
follow(twitter_producer(fruits), failover(schedule))
def log_splitter(n=10):
logs = imap(curry(limiter, n), imap(printer, count()))
follow(twitter_producer(fruits), failover(logs))
def broadcaster(nworkers=2):
workers = map(printer, string.uppercase[:nworkers])
follow(retrier(twitter_producer(fruits)), broadcast(workers))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment