Created
September 29, 2011 05:20
-
-
Save rmax/1250036 to your computer and use it in GitHub Desktop.
aggreate tweets in 15-minutes slots
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| from disco import func | |
| from disco.core import Job | |
| def mapper((id, tweet), params): | |
| import rfc822 | |
| from datetime import datetime, timedelta | |
| from time import mktime | |
| utc_dt = datetime.fromtimestamp(mktime(rfc822.parsedate(tweet['created_at']))) | |
| bot_dt = utc_dt - timedelta(hours=4) | |
| bot_dt = bot_dt.replace(minute=(bot_dt.minute/15)*15) | |
| key = '{0.year}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}'.format(bot_dt) | |
| yield key, 1 | |
| def main(): | |
| job = Job().run(input=sys.argv[1:], | |
| map_reader=func.chain_reader, | |
| map=mapper, | |
| combiner=func.sum_combiner, | |
| reduce=func.sum_reduce, | |
| merge_partitions=1, | |
| sort=True, | |
| save=True) | |
| results = job.wait() | |
| print results | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment