Created
          November 15, 2014 11:22 
        
      - 
      
- 
        Save alejandrobernardis/dfde7b0f7a8d9c711834 to your computer and use it in GitHub Desktop. 
    MP Buffer
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | from random import randint | |
| from collections import deque | |
| MIXPANEL_EVENTS = 'events' | |
| MIXPANEL_PEOPLE = 'people' | |
| MIXPANEL_IMPORTS = 'imports' | |
| MIXPANEL_ENDPOINTS = { | |
| MIXPANEL_EVENTS: 'https://api.mixpanel.com/track', | |
| MIXPANEL_PEOPLE: 'https://api.mixpanel.com/engage', | |
| MIXPANEL_IMPORTS: 'https://api.mixpanel.com/import' | |
| } | |
| MIXPANEL_ENDPOINTS_KEYS = frozenset(MIXPANEL_ENDPOINTS.keys()) | |
| _current = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS} | |
| _buffers = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS} | |
| def flush(endpoint=None, max_size=10): | |
| response = None | |
| endpoints = [endpoint] if endpoint in MIXPANEL_ENDPOINTS_KEYS \ | |
| else MIXPANEL_ENDPOINTS_KEYS | |
| for endpoint in endpoints: | |
| current = _current[endpoint] | |
| current_buffer = _buffers[endpoint] | |
| current_buffer.extend(current) | |
| current.clear() | |
| if not len(current_buffer): | |
| print endpoint, 'len(0) -> continue' | |
| continue | |
| try: | |
| buf = list(current_buffer) | |
| buf_rollback = [] | |
| while buf: | |
| batch = buf[:max_size] | |
| message = '[{0}]'.format(','.join(batch)) | |
| print endpoint, | |
| try: | |
| if randint(0,1) == 1: | |
| raise ValueError('') | |
| print 'fetch(%s) -> message(%s)' % (len(batch), message) | |
| except: | |
| print 'exception(%s) -> message(%s)' % (len(batch), message) | |
| buf_rollback.extend(batch) | |
| buf = buf[max_size:] | |
| print 'rollback(%s)' % len(buf_rollback), buf_rollback | |
| except: | |
| current.extend(current_buffer) | |
| current_buffer.clear() | |
| if __name__ == '__main__': | |
| for i in xrange(100000): | |
| _current[MIXPANEL_EVENTS].append('item(%s)' % i) | |
| flush(max_size=100) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment