Skip to content

Instantly share code, notes, and snippets.

@rmax
Created September 29, 2011 05:20
Show Gist options
  • Select an option

  • Save rmax/1250036 to your computer and use it in GitHub Desktop.

Select an option

Save rmax/1250036 to your computer and use it in GitHub Desktop.
aggreate tweets in 15-minutes slots
import sys
from disco import func
from disco.core import Job
def mapper((id, tweet), params):
import rfc822
from datetime import datetime, timedelta
from time import mktime
utc_dt = datetime.fromtimestamp(mktime(rfc822.parsedate(tweet['created_at'])))
bot_dt = utc_dt - timedelta(hours=4)
bot_dt = bot_dt.replace(minute=(bot_dt.minute/15)*15)
key = '{0.year}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}'.format(bot_dt)
yield key, 1
def main():
job = Job().run(input=sys.argv[1:],
map_reader=func.chain_reader,
map=mapper,
combiner=func.sum_combiner,
reduce=func.sum_reduce,
merge_partitions=1,
sort=True,
save=True)
results = job.wait()
print results
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment