Created
September 1, 2015 09:31
-
-
Save ttsiodras/463fb9f6c4b4de8e9364 to your computer and use it in GitHub Desktop.
MixPanel importer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Mixpanel event importer | |
Usage: | |
mp_import_evts.py -c configFile -t token -a api_key -i fname -d delta | |
mp_import_evts.py (-h | --help) | |
mp_import_evts.py --version | |
Options: | |
-c configFile config file with settings | |
-t token MixPanel token | |
-a api_key MixPanel API key | |
-i filename Input filename (as exported from MixPanel) | |
-d delta The MixPanel project's UTC offset in hours | |
(e.g. -8 for PST, 1 for EEST) | |
-h --help Show this usage screen | |
--version Show version | |
""" | |
from docopt import docopt | |
import os | |
import sys | |
import json | |
import base64 | |
import urllib | |
import cPickle | |
from time import strftime | |
from collections import defaultdict | |
import logging | |
import eventlet | |
from eventlet.green import urllib2 | |
from pyramid.paster import get_appsettings | |
from sqlalchemy import engine_from_config | |
from pyzendoc.models import ( | |
DBSession, | |
User, | |
UserCompany) | |
def panic(msg): | |
g_logger.error(msg) | |
sys.exit(1) | |
class EventImporter(object): | |
def __init__(self, token, api_key, time_offset): | |
self.token = token | |
self.api_key = api_key | |
self.time_offset = time_offset | |
def update(self, event_list): | |
url = "http://api.mixpanel.com/import/?" | |
batch = [] | |
for event in event_list: | |
assert "time" in event['properties'], \ | |
"Must specify a backdated time" | |
assert "distinct_id" in event['properties'], \ | |
"Must specify a distinct ID" | |
event['properties']['time'] = \ | |
str( | |
int(event['properties']['time']) - | |
(self.time_offset * 3600) | |
) # transforms timestamp to UTC | |
if "token" not in event['properties']: | |
event['properties']["token"] = self.token | |
batch.append(event) | |
payload = { | |
"data": base64.b64encode(json.dumps(batch)), | |
"verbose": 1, | |
"api_key": self.api_key | |
} | |
response = urllib2.urlopen(url, urllib.urlencode(payload)) | |
message = response.read() | |
#g_logger.info( | |
# "Sent 50 events on " + strftime("%Y-%m-%d %H:%M:%S") + "!") | |
#g_logger.info("Received: " + str(message)) | |
if json.loads(message)['status'] != 1: | |
raise RuntimeError('import failed') | |
def batch_update(self, filename, companies_of_user): | |
pool = eventlet.GreenPool(size=10) | |
events = [] | |
total = 0 | |
with open(filename, 'r') as f: | |
for event in f: | |
evt_data = json.loads(event) | |
user_id = evt_data.get('properties', {}).get('id') | |
if user_id is None: | |
distinct_id = evt_data.get('properties', {}).get('distinct_id') | |
try: | |
user_id = int(distinct_id) | |
except ValueError: | |
# No id present, and weird distinct_id value... | |
continue | |
# As per suggestion from Marco Sanchez Junco of MixPanel | |
company_id = companies_of_user.get(user_id) | |
if company_id is not None and len(company_id) == 1: | |
company_id = company_id[0] | |
evt_data['properties']['id'] = company_id | |
evt_data['properties']['distinct_id'] = str(company_id) | |
events.append(evt_data) | |
if len(events) == 50: | |
total += 50 | |
if (total % 1000) == 0: | |
g_logger.info("Sent %d events" % total) | |
pool.spawn(self.update, events) | |
events = [] | |
if len(events): | |
self.update(events) | |
g_logger.info(str(events) + "\n" + | |
"Sent remaining %d events!" % len(events)) | |
def main(args): | |
# Setup logging | |
logging.getLogger("segment").addHandler(logging.StreamHandler()) | |
logger = logging.getLogger("mixpanel_client") | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
handler.setLevel(logging.INFO) | |
formatter = logging.Formatter( | |
'%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
global g_logger | |
g_logger = logger | |
# Parse config file | |
config_file = args.get('-c', None) | |
if config_file is None: | |
panic("Configuration file is mandatory! Aborting...") | |
settings = get_appsettings(config_file) | |
if not os.path.exists("mappings.db"): | |
# Setup SQLAlchemy part | |
sqlalchemy_url = settings.get('sqlalchemy.url', None) | |
if sqlalchemy_url is None: | |
panic("Missing sqlalchemy.url config line! Aborting...") | |
engine = engine_from_config(settings, 'sqlalchemy.') | |
DBSession.configure(bind=engine, autocommit=False) | |
# Prepare user to company lookup | |
query = DBSession.query( | |
User.id, UserCompany.company_id | |
).join( | |
UserCompany, UserCompany.user_id == User.id | |
).filter( | |
UserCompany.active.is_(True) | |
) | |
companies_of_user_temp = defaultdict(list) | |
for user_id, company_id in query: | |
companies_of_user_temp[user_id].append(company_id) | |
companies_of_user = { | |
k: v | |
for k, v in companies_of_user_temp.iteritems() | |
if len(v) <= 1 | |
} | |
cPickle.dump(companies_of_user, open("mappings.db", "w")) | |
g_logger.info("Created user/company mappings.db") | |
else: | |
g_logger.info("Using cached user/company mappings.db") | |
companies_of_user = cPickle.load(open("mappings.db")) | |
# Parse args | |
if not os.path.exists(args.get("-i")): | |
panic("input file '%s' not found!" % unicode(args.get("-i"))) | |
import_event = EventImporter( | |
args.get("-t"), args.get("-a"), int(args.get("-d"))) | |
import_event.batch_update(args.get("-i"), companies_of_user) | |
if __name__ == "__main__": | |
main(docopt(__doc__, version='MixPanel Importer 0.1')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment