Created
September 26, 2012 20:23
-
-
Save amiller/3790351 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib, urllib2 | |
import json | |
import base64 | |
import time | |
import itertools | |
import string | |
def coroutine(func): | |
def start(*args,**kwargs): | |
cr = func(*args,**kwargs) | |
cr.next() | |
return cr | |
return start | |
class config: | |
TWITTER = { | |
'user': 'dv_test_1', | |
'password': 'mrybRJnp'} | |
def twitter_producer(queries): | |
# Consumes all of its input before yielding | |
url = 'https://stream.twitter.com/1/statuses/filter.json' | |
queries = [q.lower() for q in queries] | |
quoted_queries = [urllib.quote(q) for q in queries] | |
query_post = 'track=' + ",".join(quoted_queries) | |
request = urllib2.Request(url, query_post) | |
auth = base64.b64encode('%s:%s' % (config.TWITTER['user'], config.TWITTER['password'])) | |
request.add_header('Authorization', "basic %s" % auth) | |
for item in urllib2.urlopen(request): | |
try: | |
item = json.loads(item) | |
except json.JSONDecodeError: #for whatever reason json reading twitters json sometimes raises this | |
raise StopIteration | |
if 'text' in item and 'user' in item: | |
text = item['text'].lower() | |
user_id = item['user']['id_str'] | |
timestamp = int(time.mktime(time.strptime(item['created_at'],'%a %b %d %H:%M:%S +0000 %Y'))) | |
yield (user_id, timestamp, text) | |
def retrier(factory): | |
while True: | |
try: | |
for item in factory(): yield item | |
except StopIteration: | |
print 'Feed closed, waiting 10 seconds' | |
time.sleep(10) | |
except urllib2.HTTPError, e: | |
print 'HTTP Error, waiting 30 seconds' | |
time.sleep(30) | |
print 'retrying' | |
fruits = ['apple','banana','carrot'] | |
def follow(data, outport): | |
for datum in data: | |
outport.send(datum) | |
# Broadcast a stream onto multiple targets | |
@coroutine | |
def broadcast(outports): | |
while True: | |
for outport in outports: outport.send((yield)) | |
@coroutine | |
def printer(name=''): | |
while True: | |
line = (yield) | |
print name, line | |
@coroutine | |
def failover(outports): | |
for outport in outports: | |
try: | |
while True: outport.send((yield)) | |
except StopIteration: continue | |
@coroutine | |
def limiter(n, outport): | |
for _ in xrange(n): outport.send((yield)) | |
from itertools import imap, repeat, count | |
chain = itertools.chain.from_iterable | |
curry = lambda f, x: lambda y: f(x, y) | |
if __name__ == '__main__': | |
def round_robin(nworkers=5, chunk=10): | |
workers = map(printer, string.uppercase[:nworkers]) | |
schedule = imap(curry(limiter, chunk), chain(repeat(workers))) | |
follow(twitter_producer(fruits), failover(schedule)) | |
def log_splitter(n=10): | |
logs = imap(curry(limiter, n), imap(printer, count())) | |
follow(twitter_producer(fruits), failover(logs)) | |
def broadcaster(nworkers=2): | |
workers = map(printer, string.uppercase[:nworkers]) | |
follow(retrier(twitter_producer(fruits)), broadcast(workers)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment