Last active
          August 29, 2015 14:10 
        
      - 
      
- 
        Save alejandrobernardis/fd5a99d365452b4202d0 to your computer and use it in GitHub Desktop. 
    Mixpanel Async Client (tornado web server)
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python2.7 | |
| # -*- coding: utf-8 -*- | |
| # Copyright (c) 2014 Asumi Kamikaze Inc. | |
| # Licensed under the MIT License. | |
| # Author: Alejandro M. Bernardis | |
| # Email: alejandro (dot) bernardis (at) asumikamikaze (dot) com | |
| # Created: 20/Oct/2014 11:59 | |
| import datetime | |
| from importlib import import_module | |
| from tornado import gen | |
| TEXT_SUPPORTED = basestring | |
| class BaseError(Exception): | |
| _code = -1 | |
| def __init__(self, message, code=None, *args, **kwargs): | |
| super(BaseError, self).__init__(message, *args, **kwargs) | |
| try: | |
| self.code = int(code) | |
| except: | |
| self.code = self._code | |
| def __unicode__(self): | |
| return unicode(self.message) | |
| def __str__(self): | |
| return self.__unicode__().encode() | |
| def __nonzero__(self): | |
| return False | |
| class CoroutineError(BaseError): | |
| def __init__(self, message, exception=None, code=None, *args, **kwargs): | |
| if message is None and exception is not None: | |
| message = str(exception) | |
| super(CoroutineError, self).__init__(message, code, *args, **kwargs) | |
| self._exception = exception | |
| self._errors = kwargs.get('errors', []) | |
| @property | |
| def exception(self): | |
| return self._exception | |
| @property | |
| def errors(self): | |
| return self._errors | |
| class ConfigurationError(BaseError): | |
| _code = 1000 | |
| class ClientError(CoroutineError): | |
| _code = 2000 | |
| def verify_argument(func): | |
| def decorator(value, *args, **kwargs): | |
| if not value: | |
| return None | |
| return func(value, *args, **kwargs) | |
| return decorator | |
| def is_primitive(value): | |
| primitives = \ | |
| (complex, int, float, long, bool, str, basestring, unicode, tuple, list) | |
| return isinstance(value, primitives) | |
| @verify_argument | |
| def unicode_to_str(value): | |
| return value.encode('utf-8') | |
| def setdefault(source, key, default): | |
| if not isinstance(source, dict): | |
| source = {} | |
| source[key] = source.get(key, None) or default | |
| @verify_argument | |
| def complex_types(value): | |
| if isinstance(value, unicode): | |
| return unicode_to_str(value) | |
| elif is_primitive(value): | |
| return value | |
| elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): | |
| return value.isoformat() | |
| return str(value) | |
| def import_by_path(dotted_path): | |
| try: | |
| module_path, class_name = dotted_path.rsplit('.', 1) | |
| except ValueError: | |
| raise ConfigurationError( | |
| '%s doesn\'t look like a module path' % dotted_path) | |
| try: | |
| module = import_module(module_path) | |
| except ImportError, e: | |
| raise ConfigurationError( | |
| 'Error importing module %s: %s' % (module_path, e)) | |
| try: | |
| attr = getattr(module, class_name) | |
| except AttributeError: | |
| raise ConfigurationError( | |
| 'Module "%s" does not define a "%s" attribute/class' % ( | |
| module_path, class_name)) | |
| return attr | |
| def dispatch(response, callback=None, raising=False): | |
| if raising and isinstance(response, Exception): | |
| raise response | |
| if callback is None: | |
| raise gen.Return(response) | |
| callback(response) | |
| # It's Dangerous | |
| # https://pythonhosted.org/itsdangerous/ | |
| def want_bytes(s, encoding='utf-8', errors='strict'): | |
| if isinstance(s, unicode): | |
| s = s.encode(encoding, errors) | |
| return s | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | from mixpanel import AsyncMixpanelClient | |
| from tornado import ioloop | |
| from tornado.httpclient import AsyncHTTPClient | |
| AsyncHTTPClient.configure('tornado.curl_httpclient.CurlAsyncHTTPClient') | |
| @gen.coroutine | |
| def run(): | |
| client = AsyncMixpanelClient('==token==', True) | |
| raw_input('Press (enter) to continue...') | |
| try: | |
| username = int(time.time()) | |
| for i in xrange(10): | |
| print i, | |
| yield client.track(username, 'item_%s' % i, {'i': i}) | |
| time.sleep(1) | |
| print '\n', '.' * 80 | |
| r = yield client.consumer.flush() | |
| print r | |
| except Exception, e: | |
| print e | |
| ioloop.IOLoop.current().stop() | |
| if __name__ == '__main__': | |
| run() | |
| ioloop.IOLoop.instance().start() | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python2.7 | |
| # -*- coding: utf-8 -*- | |
| # Copyright (c) 2014 Asumi Kamikaze Inc. | |
| # Licensed under the MIT License. | |
| # Author: Alejandro M. Bernardis | |
| # Email: alejandro (dot) bernardis (at) asumikamikaze (dot) com | |
| # Created: 12/Nov/2014 00:49 | |
| # Ref: | |
| # https://github.com/jessepollak/mixpanel-python-async/ | |
| import time | |
| import copy | |
| import base64 | |
| from b7 import TEXT_SUPPORTED, ClientError, ConfigurationError, setdefault, \ | |
| want_bytes, complex_types, dispatch, import_by_path | |
| from collections import deque | |
| from datetime import datetime, timedelta | |
| from functools import wraps | |
| from tornado import gen | |
| from tornado.httpclient import HTTPRequest, HTTPResponse, AsyncHTTPClient | |
| from urllib import urlencode | |
| try: | |
| import simplejson as json | |
| except ImportError: | |
| import json | |
| __all__ = ( | |
| 'MixpanelConsumer', | |
| 'MixpanelBufferedConsumer', | |
| 'MixpanelConsumer', | |
| 'AsyncMixpanelClient' | |
| ) | |
| MIXPANEL_EVENTS = 'events' | |
| MIXPANEL_PEOPLE = 'people' | |
| MIXPANEL_IMPORTS = 'imports' | |
| MIXPANEL_ENDPOINTS = { | |
| MIXPANEL_EVENTS: 'https://api.mixpanel.com/track', | |
| MIXPANEL_PEOPLE: 'https://api.mixpanel.com/engage', | |
| MIXPANEL_IMPORTS: 'https://api.mixpanel.com/import' | |
| } | |
| MIXPANEL_ENDPOINTS_KEYS = frozenset(MIXPANEL_ENDPOINTS.keys()) | |
| def is_endpoint(method): | |
| @wraps(method) | |
| def wrapper(self, *args, **kwargs): | |
| endpoint = kwargs.get('endpoint', args[0]) | |
| if endpoint not in MIXPANEL_ENDPOINTS_KEYS: | |
| raise ClientError( | |
| 'No such endpoints "%s". Valid endpoints are one of %s.' | |
| % (endpoint, MIXPANEL_ENDPOINTS_KEYS)) | |
| return method(self, *args, **kwargs) | |
| return wrapper | |
| def is_buffered(method): | |
| @wraps(method) | |
| def wrapper(self, *args, **kwargs): | |
| if not getattr(self, '_buffered', False): | |
| raise ClientError('Async buffered consumer not supported') | |
| return method(self, *args, **kwargs) | |
| return wrapper | |
| class MetricConsumer(object): | |
| @property | |
| def errors(self): | |
| raise NotImplementedError() | |
| def fetch(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| def flush(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| def send(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| class MixpanelConsumer(MetricConsumer): | |
| def __init__(self, endpoints=None, **settings): | |
| for item in settings.get('endpoints', endpoints or {}).keys(): | |
| if item not in MIXPANEL_ENDPOINTS_KEYS: | |
| raise ConfigurationError('Endpoint not supported: %s' % item) | |
| setdefault(settings, 'endpoints', MIXPANEL_ENDPOINTS) | |
| self._endpoints = settings['endpoints'] | |
| def _sanitize_message(self, message): | |
| if isinstance(message, deque): | |
| message = list(message) | |
| return json.dumps(message, default=complex_types, separators=(',', ':')) | |
| def _sanitize_response(self, response): | |
| if isinstance(response, HTTPResponse): | |
| body = json.loads(response.body) | |
| if response.code != 200 or body.get('status', 0) != 1: | |
| raise ClientError( | |
| 'Mixpanel API Error: %s' % body.get('error', '-')) | |
| response = body | |
| return response | |
| def _prepare(self, message, api_key=None): | |
| if isinstance(message, (deque, list, tuple, dict)): | |
| message = self._sanitize_message(message) | |
| elif not isinstance(message, TEXT_SUPPORTED): | |
| raise ClientError('Invalid message, must be a string') | |
| message = base64.b64encode(want_bytes(message)) | |
| values = {'data': message, 'verbose': 1, 'ip': 0} | |
| if api_key is not None: | |
| values['api_key'] = api_key | |
| return urlencode(values, True).encode('utf-8') | |
| @is_endpoint | |
| @gen.coroutine | |
| def fetch(self, endpoint, message, headers=None, callback=None, **kwargs): | |
| request = HTTPRequest(self._endpoints[endpoint]) | |
| request.method = 'POST' | |
| request.headers = headers | |
| request.body = self._prepare(message, **kwargs) | |
| response = yield AsyncHTTPClient().fetch(request) | |
| response = self._sanitize_response(response) | |
| raise gen.Return(response) | |
| @is_endpoint | |
| @gen.coroutine | |
| def send(self, endpoint, message, callback=None, **kwargs): | |
| response = yield self.fetch(endpoint, message, **kwargs) | |
| dispatch(response, callback) | |
| class MixpanelBufferedConsumer(MixpanelConsumer): | |
| ALL = 'all' | |
| ENDPOINT = 'endpoint' | |
| def __init__(self, endpoints=None, flush_after=60, flush_first=True, | |
| max_size=50, **kwargs): | |
| self._current = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS} | |
| self._buffers = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS} | |
| self._flush_first = flush_first | |
| self._flush_after = timedelta(0, flush_after) | |
| self._last_flushed = None if not flush_after else datetime.utcnow() | |
| self._errors = None | |
| self._max_size = min(max_size, 50) | |
| super(MixpanelBufferedConsumer, self).__init__(endpoints, **kwargs) | |
| @property | |
| def errors(self): | |
| return self._errors | |
| def _should_flush(self, endpoint=None): | |
| full = endpoint and len(self._current[endpoint]) >= self._max_size | |
| stale = self._last_flushed is None | |
| if not stale and self._flush_after: | |
| stale = datetime.utcnow() - self._last_flushed > self._flush_after | |
| if stale: | |
| return self.ALL | |
| if full: | |
| return self.ENDPOINT | |
| return False | |
| @gen.coroutine | |
| def _prepare_flush(self, endpoint=None, **kwargs): | |
| errors = [] | |
| endpoints = [endpoint] if endpoint in MIXPANEL_ENDPOINTS_KEYS \ | |
| else MIXPANEL_ENDPOINTS_KEYS | |
| for endpoint in endpoints: | |
| current = self._current[endpoint] | |
| if not len(current): | |
| continue | |
| buffering = self._buffers[endpoint] | |
| buffering.extend(current) | |
| current.clear() | |
| try: | |
| message = '[{0}]'.format(','.join(buffering)) | |
| yield self.fetch(endpoint, message, **kwargs) | |
| except Exception, e: | |
| current.extendleft(buffering) | |
| errors.append(ClientError(endpoint, e)) | |
| buffering.clear() | |
| response = len(errors) == 0 | |
| self._errors = None if response else errors | |
| raise gen.Return(response) | |
| @gen.coroutine | |
| def flush(self, callback=None, **kwargs): | |
| response = yield self._prepare_flush(**kwargs) | |
| dispatch(response, callback) | |
| @is_endpoint | |
| @gen.coroutine | |
| def send(self, endpoint, message, callback=None, **kwargs): | |
| response = False | |
| current = self._current[endpoint] | |
| current.append(self._sanitize_message(message)) | |
| try: | |
| should = self._should_flush(endpoint) | |
| except: | |
| should = False | |
| if should == self.ALL: | |
| response = yield self._prepare_flush(**kwargs) | |
| elif should == self.ENDPOINT: | |
| response = yield self._prepare_flush(endpoint, **kwargs) | |
| dispatch(response, callback) | |
| class MetricClient(object): | |
| """ | |
| Configuration example: | |
| ~~~~~~~~~~~~~~~~~~~~~~ | |
| settings: | |
| buffered: True | |
| adapter: | |
| engine: 'mixpanel' | |
| token: '' | |
| flush_after: 60 | |
| flush_first: True | |
| max_size: 50 | |
| endpoints: | |
| key: 'value' | |
| """ | |
| @property | |
| def consumer(self): | |
| raise NotImplementedError() | |
| def track(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| def import_data(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| class AsyncMixpanelClient(MetricClient): | |
| VERSION = '1.0.0' | |
| VERSION_NAME = 'python/tornado' | |
| def __init__(self, buffered=False, **settings): | |
| setdefault(settings, 'token', 'mixpanel_token') | |
| self._token = settings['token'] | |
| if not buffered: | |
| self._consumer = MixpanelConsumer(**settings) | |
| else: | |
| self._consumer = MixpanelBufferedConsumer(**settings) | |
| @property | |
| def consumer(self): | |
| return self._consumer | |
| def _sanitize_argument(self, value, default): | |
| return default if value is None else value | |
| @gen.coroutine | |
| def import_data(self, api_key, distinct_id, event_name, timestamp, | |
| properties=None, meta=None, callback=None): | |
| prop = { | |
| 'token': self._token, | |
| 'distinct_id': distinct_id, | |
| 'time': int(timestamp), | |
| 'mp_lib': self.VERSION_NAME, | |
| '$lib_version': self.VERSION, | |
| } | |
| prop.update(self._sanitize_argument(properties, {})) | |
| data = {'event': event_name, 'properties': prop} | |
| data.update(self._sanitize_argument(meta, {})) | |
| response = yield self._consumer.send(MIXPANEL_EVENTS, data, | |
| api_key=api_key) | |
| dispatch(response, callback) | |
| @gen.coroutine | |
| def track(self, distinct_id, event_name, properties=None, meta=None, | |
| callback=None): | |
| prop = { | |
| 'token': self._token, | |
| 'distinct_id': distinct_id, | |
| 'time': int(time.time()), | |
| 'mp_lib': self.VERSION_NAME, | |
| '$lib_version': self.VERSION, | |
| } | |
| prop.update(self._sanitize_argument(properties, {})) | |
| data = {'event': event_name, 'properties': prop} | |
| data.update(self._sanitize_argument(meta, {})) | |
| response = yield self._consumer.send(MIXPANEL_EVENTS, data) | |
| dispatch(response, callback) | |
| @gen.coroutine | |
| def alias(self, alias_id, original, meta=None, callback=None): | |
| data = { | |
| 'event': '$create_alias', | |
| 'properties': { | |
| 'distinct_id': original, | |
| 'alias': alias_id, | |
| 'token': self._token | |
| } | |
| } | |
| data.update(self._sanitize_argument(meta, {})) | |
| response = yield self._consumer.fetch(MIXPANEL_EVENTS, data) | |
| dispatch(response, callback) | |
| @gen.coroutine | |
| def people(self, distinct_id, message, meta=None, callback=None): | |
| data = { | |
| '$token': self._token, | |
| '$distinct_id': distinct_id, | |
| '$time': int(time.time() * 1000) | |
| } | |
| data.update(message, **self._sanitize_argument(meta, {})) | |
| response = yield self._consumer.send(MIXPANEL_PEOPLE, data) | |
| dispatch(response, callback) | |
| def people_set(self, distinct_id, properties, meta=None, callback=None): | |
| return self.people(distinct_id, {'$set': properties}, meta, callback) | |
| def people_set_once(self, distinct_id, properties, meta=None, | |
| callback=None): | |
| return self.people(distinct_id, {'$set_once': properties}, meta, | |
| callback) | |
| def people_add(self, distinct_id, properties, meta=None, callback=None): | |
| return self.people(distinct_id, {'$add': properties}, meta, callback) | |
| def people_append(self, distinct_id, properties, meta=None, callback=None): | |
| return self.people(distinct_id, {'$append': properties}, meta, callback) | |
| def people_union(self, distinct_id, properties, meta=None, callback=None): | |
| return self.people(distinct_id, {'$union': properties}, meta, callback) | |
| def people_unset(self, distinct_id, properties, meta=None, callback=None): | |
| return self.people(distinct_id, {'$unset': properties}, meta, callback) | |
| def people_delete(self, distinct_id, meta=None, callback=None): | |
| return self.people(distinct_id, {'$delete': ''}, meta, callback) | |
| def people_track_charge(self, distinct_id, amount, properties=None, | |
| meta=None, callback=None): | |
| properties = self._sanitize_argument(properties, {}) | |
| properties['$amount'] = amount | |
| return self.people_append(distinct_id, {'$transactions': properties}, | |
| meta, callback) | |
| def people_clear_charges(self, distinct_id, meta=None, callback=None): | |
| return self.people_unset(distinct_id, ["$transactions"], meta, callback) | |
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment