Skip to content

Instantly share code, notes, and snippets.

@alejandrobernardis
Created November 15, 2014 11:22
Show Gist options
  • Save alejandrobernardis/dfde7b0f7a8d9c711834 to your computer and use it in GitHub Desktop.
Save alejandrobernardis/dfde7b0f7a8d9c711834 to your computer and use it in GitHub Desktop.
MP Buffer
from random import randint
from collections import deque
MIXPANEL_EVENTS = 'events'
MIXPANEL_PEOPLE = 'people'
MIXPANEL_IMPORTS = 'imports'
MIXPANEL_ENDPOINTS = {
MIXPANEL_EVENTS: 'https://api.mixpanel.com/track',
MIXPANEL_PEOPLE: 'https://api.mixpanel.com/engage',
MIXPANEL_IMPORTS: 'https://api.mixpanel.com/import'
}
MIXPANEL_ENDPOINTS_KEYS = frozenset(MIXPANEL_ENDPOINTS.keys())
_current = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS}
_buffers = {key: deque() for key in MIXPANEL_ENDPOINTS_KEYS}
def flush(endpoint=None, max_size=10):
response = None
endpoints = [endpoint] if endpoint in MIXPANEL_ENDPOINTS_KEYS \
else MIXPANEL_ENDPOINTS_KEYS
for endpoint in endpoints:
current = _current[endpoint]
current_buffer = _buffers[endpoint]
current_buffer.extend(current)
current.clear()
if not len(current_buffer):
print endpoint, 'len(0) -> continue'
continue
try:
buf = list(current_buffer)
buf_rollback = []
while buf:
batch = buf[:max_size]
message = '[{0}]'.format(','.join(batch))
print endpoint,
try:
if randint(0,1) == 1:
raise ValueError('')
print 'fetch(%s) -> message(%s)' % (len(batch), message)
except:
print 'exception(%s) -> message(%s)' % (len(batch), message)
buf_rollback.extend(batch)
buf = buf[max_size:]
print 'rollback(%s)' % len(buf_rollback), buf_rollback
except:
current.extend(current_buffer)
current_buffer.clear()
if __name__ == '__main__':
for i in xrange(100000):
_current[MIXPANEL_EVENTS].append('item(%s)' % i)
flush(max_size=100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment