Last active
June 5, 2018 12:34
-
-
Save adam704a/9167bf3f5600471f4a73be81809c8137 to your computer and use it in GitHub Desktop.
The Great Tweet Migration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Migrate Tweets from Mongo to a date partitioned format in S3\n", | |
"This is the process for moving 61 million tweets from a mongo database to Amazon's S3" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"# Connect to S3\n", | |
"# where are the aws credentails you might ask. these are configured using the aws cli.\n", | |
"import boto3\n", | |
"\n", | |
"s3 = boto3.resource('s3')\n", | |
"bucket = s3.Bucket('org.rti.mj.tweets')\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"# Connect to MongoDB\n", | |
"from pymongo import MongoClient\n", | |
"import datetime\n", | |
"\n", | |
"\n", | |
"# User exists in the mj_analysis database, so you have to authenticate there\n", | |
"c = MongoClient(\"mongodb://datascientist:youthoughtiwasgoingtoleavethepasswordinheredidntyou@mongo.ictedge.org/mj_sample\")\n", | |
"db = c.mj_sample\n", | |
"collection = db.mj_sample\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"# Handy little function for iterating over dates. You have to <3 the yeild here.\n", | |
"from datetime import timedelta, date\n", | |
"\n", | |
"\n", | |
"def daterange(start_time, end_time):\n", | |
" for n in range(int ((end_time - start_time).days*24)):\n", | |
" yield start_time + timedelta(hours=n)\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"# Set date range\n", | |
"start_time = datetime.datetime(2014, 11, 15, 0, 0)\n", | |
"end_time = datetime.datetime(2016, 4, 21, 0, 0)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from bson import json_util\n", | |
"import json\n", | |
"\n", | |
"for n_time in daterange(start_time, end_time):\n", | |
" \n", | |
" # for each interval write a new file\n", | |
" with open('data.json', 'w') as outfile:\n", | |
" \n", | |
" print(\"querying \"+ str(n_time) +\" \"+ str(n_time + datetime.timedelta(hours=1)) ) \n", | |
" _end = n_time + datetime.timedelta(hours=1)\n", | |
" \n", | |
" for post in collection.find({\"created_at\" : { \"$lte\" : _end, \"$gte\": n_time} }):\n", | |
" \n", | |
" json.dump(post, outfile, default=json_util.default)\n", | |
" outfile.write('\\n')\n", | |
" \n", | |
" # Save file to S3 /year/month/date/part-0000n.json (where n = hour)\n", | |
" data = open('data.json', 'rb')\n", | |
" bucket.put_object(Key=str(n_time.year)+'/'+str(n_time.month)+'/'+str(n_time.day)+'/part-0000'+str(n_time.hour)+'.json', Body=data)\n", | |
" \n", | |
" outfile.close() " | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.4.4" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment