Skip to content

Instantly share code, notes, and snippets.

@alejandrobernardis
Last active August 29, 2015 14:10
Show Gist options
  • Save alejandrobernardis/fd5a99d365452b4202d0 to your computer and use it in GitHub Desktop.
Save alejandrobernardis/fd5a99d365452b4202d0 to your computer and use it in GitHub Desktop.
Mixpanel Async Client (tornado web server)
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Asumi Kamikaze Inc.
# Licensed under the MIT License.
# Author: Alejandro M. Bernardis
# Email: alejandro (dot) bernardis (at) asumikamikaze (dot) com
# Created: 20/Oct/2014 11:59
import datetime
from importlib import import_module
from tornado import gen
TEXT_SUPPORTED = basestring
class BaseError(Exception):
_code = -1
def __init__(self, message, code=None, *args, **kwargs):
super(BaseError, self).__init__(message, *args, **kwargs)
try:
self.code = int(code)
except:
self.code = self._code
def __unicode__(self):
return unicode(self.message)
def __str__(self):
return self.__unicode__().encode()
def __nonzero__(self):
return False
class CoroutineError(BaseError):
def __init__(self, message, exception=None, code=None, *args, **kwargs):
if message is None and exception is not None:
message = str(exception)
super(CoroutineError, self).__init__(message, code, *args, **kwargs)
self._exception = exception
self._errors = kwargs.get('errors', [])
@property
def exception(self):
return self._exception
@property
def errors(self):
return self._errors
class ConfigurationError(BaseError):
_code = 1000
class ClientError(CoroutineError):
_code = 2000
def verify_argument(func):
def decorator(value, *args, **kwargs):
if not value:
return None
return func(value, *args, **kwargs)
return decorator
def is_primitive(value):
primitives = \
(complex, int, float, long, bool, str, basestring, unicode, tuple, list)
return isinstance(value, primitives)
@verify_argument
def unicode_to_str(value):
return value.encode('utf-8')
def setdefault(source, key, default):
if not isinstance(source, dict):
source = {}
source[key] = source.get(key, None) or default
@verify_argument
def complex_types(value):
if isinstance(value, unicode):
return unicode_to_str(value)
elif is_primitive(value):
return value
elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
return value.isoformat()
return str(value)
def import_by_path(dotted_path):
try:
module_path, class_name = dotted_path.rsplit('.', 1)
except ValueError:
raise ConfigurationError(
'%s doesn\'t look like a module path' % dotted_path)
try:
module = import_module(module_path)
except ImportError, e:
raise ConfigurationError(
'Error importing module %s: %s' % (module_path, e))
try:
attr = getattr(module, class_name)
except AttributeError:
raise ConfigurationError(
'Module "%s" does not define a "%s" attribute/class' % (
module_path, class_name))
return attr
def dispatch(response, callback=None, raising=False):
if raising and isinstance(response, Exception):
raise response
if callback is None:
raise gen.Return(response)
callback(response)
# It's Dangerous
# https://pythonhosted.org/itsdangerous/
def want_bytes(s, encoding='utf-8', errors='strict'):
if isinstance(s, unicode):
s = s.encode(encoding, errors)
return s
from mixpanel import AsyncMixpanelClient
from tornado import ioloop
from tornado.httpclient import AsyncHTTPClient
AsyncHTTPClient.configure('tornado.curl_httpclient.CurlAsyncHTTPClient')
@gen.coroutine
def run():
client = AsyncMixpanelClient('==token==', True)
raw_input('Press (enter) to continue...')
try:
username = int(time.time())
for i in xrange(10):
print i,
yield client.track(username, 'item_%s' % i, {'i': i})
time.sleep(1)
print '\n', '.' * 80
r = yield client.consumer.flush()
print r
except Exception, e:
print e
ioloop.IOLoop.current().stop()
if __name__ == '__main__':
run()
ioloop.IOLoop.instance().start()
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Asumi Kamikaze Inc.
# Licensed under the MIT License.
# Author: Alejandro M. Bernardis
# Email: alejandro (dot) bernardis (at) asumikamikaze (dot) com
# Created: 12/Nov/2014 00:49
# Ref:
# https://github.com/jessepollak/mixpanel-python-async/
import time
import copy
import base64
from b7 import TEXT_SUPPORTED, ClientError, ConfigurationError, setdefault, \
want_bytes, complex_types, dispatch, import_by_path
from collections import deque
from datetime import datetime, timedelta
from functools import wraps
from tornado import gen
from tornado.httpclient import HTTPRequest, HTTPResponse, AsyncHTTPClient
from urllib import urlencode
try:
import simplejson as json
except ImportError:
import json
__all__ = (
'MixpanelConsumer',
'MixpanelBufferedConsumer',
'MixpanelConsumer',
'AsyncMixpanelClient'
)
MIXPANEL_EVENTS = 'events'
MIXPANEL_PEOPLE = 'people'
MIXPANEL_IMPORTS = 'imports'
MIXPANEL_ENDPOINTS = {
MIXPANEL_EVENTS: 'https://api.mixpanel.com/track',
MIXPANEL_PEOPLE: 'https://api.mixpanel.com/engage',
MIXPANEL_IMPORTS: 'https://api.mixpanel.com/import'
}
MIXPANEL_ENDPOINTS_KEYS = frozenset(MIXPANEL_ENDPOINTS.keys())
def is_endpoint(method):
@wraps(method)
def wrapper(self, *args, **kwargs):
endpoint = kwargs.get('endpoint', args[0])
if endpoint not in MIXPANEL_ENDPOINTS_KEYS:
raise ClientError(
'No such endpoints "%s". Valid endpoints are one of %s.'
% (endpoint, MIXPANEL_ENDPOINTS_KEYS))
return method(self, *args, **kwargs)
return wrapper
def is_buffered(method):
@wraps(method)
def wrapper(self, *args, **kwargs):
if not getattr(self, '_buffered', False):
raise ClientError('Async buffered consumer not supported')
return method(self, *args, **kwargs)
return wrapper
class MetricConsumer(object):
@property
def errors(self):
raise NotImplementedError()
def fetch(self, *args, **kwargs):
raise NotImplementedError()
def flush(self, *args, **kwargs):
raise NotImplementedError()
def send(self, *args, **kwargs):
raise NotImplementedError()
class MixpanelConsumer(MetricConsumer):
def __init__(self, endpoints=None, **settings):
for item in settings.get('endpoints', endpoints or {}).keys():
if item not in MIXPANEL_ENDPOINTS_KEYS:
raise ConfigurationError('Endpoint not supported: %s' % item)
setdefault(settings, 'endpoints', MIXPANEL_ENDPOINTS)
self._endpoints = settings['endpoints']
def _sanitize_message(self, message):
if isinstance(message, deque):
message = list(message)
return json.dumps(message, default=complex_types, separators=(',', ':'))
def _sanitize_response(self, response):
if isinstance(response, HTTPResponse):
body = json.loads(response.body)
if response.code != 200 or body.get('status', 0) != 1:
raise ClientError(
'Mixpanel API Error: %s' % body.get('error', '-'))
response = body
return response
def _prepare(self, message, api_key=None):
if isinstance(message, (deque, list, tuple, dict)):
message = self._sanitize_message(message)
elif not isinstance(message, TEXT_SUPPORTED):
raise ClientError('Invalid message, must be a string')
message = base64.b64encode(want_bytes(message))
values = {'data': message, 'verbose': 1, 'ip': 0}
if api_key is not None:
values['api_key'] = api_key
return urlencode(values, True).encode('utf-8')
@is_endpoint
@gen.coroutine
def fetch(self, endpoint, message, headers=None, callback=None, **kwargs):
request = HTTPRequest(self._endpoints[endpoint])
request.method = 'POST'
request.headers = headers
request.body = self._prepare(message, **kwargs)
response = yield AsyncHTTPClient().fetch(request)
response = self._sanitize_response(response)
raise gen.Return(response)
@is_endpoint
@gen.coroutine
def send(self, endpoint, message, callback=None, **kwargs):
response = yield self.fetch(endpoint, message, **kwargs)
dispatch(response, callback)
class MixpanelBufferedConsumer(MixpanelConsumer):
ALL = 'all'
ENDPOINT = 'endpoint'
def __init__(self, endpoints=None, flush_after=60, flush_first=True,
max_size=50, **kwargs):
self._current = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS}
self._buffers = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS}
self._flush_first = flush_first
self._flush_after = timedelta(0, flush_after)
self._last_flushed = None if not flush_after else datetime.utcnow()
self._errors = None
self._max_size = min(max_size, 50)
super(MixpanelBufferedConsumer, self).__init__(endpoints, **kwargs)
@property
def errors(self):
return self._errors
def _should_flush(self, endpoint=None):
full = endpoint and len(self._current[endpoint]) >= self._max_size
stale = self._last_flushed is None
if not stale and self._flush_after:
stale = datetime.utcnow() - self._last_flushed > self._flush_after
if stale:
return self.ALL
if full:
return self.ENDPOINT
return False
@gen.coroutine
def _prepare_flush(self, endpoint=None, **kwargs):
errors = []
endpoints = [endpoint] if endpoint in MIXPANEL_ENDPOINTS_KEYS \
else MIXPANEL_ENDPOINTS_KEYS
for endpoint in endpoints:
current = self._current[endpoint]
if not len(current):
continue
buffering = self._buffers[endpoint]
buffering.extend(current)
current.clear()
try:
message = '[{0}]'.format(','.join(buffering))
yield self.fetch(endpoint, message, **kwargs)
except Exception, e:
current.extendleft(buffering)
errors.append(ClientError(endpoint, e))
buffering.clear()
response = len(errors) == 0
self._errors = None if response else errors
raise gen.Return(response)
@gen.coroutine
def flush(self, callback=None, **kwargs):
response = yield self._prepare_flush(**kwargs)
dispatch(response, callback)
@is_endpoint
@gen.coroutine
def send(self, endpoint, message, callback=None, **kwargs):
response = False
current = self._current[endpoint]
current.append(self._sanitize_message(message))
try:
should = self._should_flush(endpoint)
except:
should = False
if should == self.ALL:
response = yield self._prepare_flush(**kwargs)
elif should == self.ENDPOINT:
response = yield self._prepare_flush(endpoint, **kwargs)
dispatch(response, callback)
class MetricClient(object):
"""
Configuration example:
~~~~~~~~~~~~~~~~~~~~~~
settings:
buffered: True
adapter:
engine: 'mixpanel'
token: ''
flush_after: 60
flush_first: True
max_size: 50
endpoints:
key: 'value'
"""
@property
def consumer(self):
raise NotImplementedError()
def track(self, *args, **kwargs):
raise NotImplementedError()
def import_data(self, *args, **kwargs):
raise NotImplementedError()
class AsyncMixpanelClient(MetricClient):
VERSION = '1.0.0'
VERSION_NAME = 'python/tornado'
def __init__(self, buffered=False, **settings):
setdefault(settings, 'token', 'mixpanel_token')
self._token = settings['token']
if not buffered:
self._consumer = MixpanelConsumer(**settings)
else:
self._consumer = MixpanelBufferedConsumer(**settings)
@property
def consumer(self):
return self._consumer
def _sanitize_argument(self, value, default):
return default if value is None else value
@gen.coroutine
def import_data(self, api_key, distinct_id, event_name, timestamp,
properties=None, meta=None, callback=None):
prop = {
'token': self._token,
'distinct_id': distinct_id,
'time': int(timestamp),
'mp_lib': self.VERSION_NAME,
'$lib_version': self.VERSION,
}
prop.update(self._sanitize_argument(properties, {}))
data = {'event': event_name, 'properties': prop}
data.update(self._sanitize_argument(meta, {}))
response = yield self._consumer.send(MIXPANEL_EVENTS, data,
api_key=api_key)
dispatch(response, callback)
@gen.coroutine
def track(self, distinct_id, event_name, properties=None, meta=None,
callback=None):
prop = {
'token': self._token,
'distinct_id': distinct_id,
'time': int(time.time()),
'mp_lib': self.VERSION_NAME,
'$lib_version': self.VERSION,
}
prop.update(self._sanitize_argument(properties, {}))
data = {'event': event_name, 'properties': prop}
data.update(self._sanitize_argument(meta, {}))
response = yield self._consumer.send(MIXPANEL_EVENTS, data)
dispatch(response, callback)
@gen.coroutine
def alias(self, alias_id, original, meta=None, callback=None):
data = {
'event': '$create_alias',
'properties': {
'distinct_id': original,
'alias': alias_id,
'token': self._token
}
}
data.update(self._sanitize_argument(meta, {}))
response = yield self._consumer.fetch(MIXPANEL_EVENTS, data)
dispatch(response, callback)
@gen.coroutine
def people(self, distinct_id, message, meta=None, callback=None):
data = {
'$token': self._token,
'$distinct_id': distinct_id,
'$time': int(time.time() * 1000)
}
data.update(message, **self._sanitize_argument(meta, {}))
response = yield self._consumer.send(MIXPANEL_PEOPLE, data)
dispatch(response, callback)
def people_set(self, distinct_id, properties, meta=None, callback=None):
return self.people(distinct_id, {'$set': properties}, meta, callback)
def people_set_once(self, distinct_id, properties, meta=None,
callback=None):
return self.people(distinct_id, {'$set_once': properties}, meta,
callback)
def people_add(self, distinct_id, properties, meta=None, callback=None):
return self.people(distinct_id, {'$add': properties}, meta, callback)
def people_append(self, distinct_id, properties, meta=None, callback=None):
return self.people(distinct_id, {'$append': properties}, meta, callback)
def people_union(self, distinct_id, properties, meta=None, callback=None):
return self.people(distinct_id, {'$union': properties}, meta, callback)
def people_unset(self, distinct_id, properties, meta=None, callback=None):
return self.people(distinct_id, {'$unset': properties}, meta, callback)
def people_delete(self, distinct_id, meta=None, callback=None):
return self.people(distinct_id, {'$delete': ''}, meta, callback)
def people_track_charge(self, distinct_id, amount, properties=None,
meta=None, callback=None):
properties = self._sanitize_argument(properties, {})
properties['$amount'] = amount
return self.people_append(distinct_id, {'$transactions': properties},
meta, callback)
def people_clear_charges(self, distinct_id, meta=None, callback=None):
return self.people_unset(distinct_id, ["$transactions"], meta, callback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment