Skip to content

Instantly share code, notes, and snippets.

@gwbischof
Created June 26, 2019 15:07
Show Gist options
  • Save gwbischof/e153b0370d91f61f4717e10c01e8586d to your computer and use it in GitHub Desktop.
Save gwbischof/e153b0370d91f61f4717e10c01e8586d to your computer and use it in GitHub Desktop.
import itertools
def page_chunks(page, chunk_size, remainder):
array_keys = ['seq_num', 'time', 'uid']
page_size = len(page['uid'])
chunks = [(0,remainder)]
chunks.extend([(i, i + chunk_size) for i
in range(remainder, page_size, chunk_size)])
for start, stop in chunks:
yield {'descriptor': page['descriptor'],
**{key: page[key][start:stop] for key in array_keys},
'data': {key: page['data'][key][start:stop]
for key in page['data'].keys()},
'timestamps': {key: page['timestamps'][key][start: stop]
for key in page['timestamps'].keys()},
'filled': {key: page['filled'][key][start:stop]
for key in page['data'].keys()}}
def merge_event_pages(event_pages):
pages = list(event_pages)
if len(pages) == 1:
return pages[0]
array_keys = ['seq_num', 'time', 'uid']
return {'descriptor': pages[0]['descriptor'],
**{key: list(itertools.chain.from_iterable(
[page[key] for page in pages])) for key in array_keys},
'data': {key: list(itertools.chain.from_iterable(
[page['data'][key] for page in pages]))
for key in pages[0]['data'].keys()},
'timestamps': {key: list(itertools.chain.from_iterable(
[page['timestamps'][key] for page in pages]))
for key in pages[0]['data'].keys()},
'filled': {key: list(itertools.chain.from_iterable(
[page['filled'][key] for page in pages]))
for key in pages[0]['data'].keys()}}
def rechunk_event_pages(event_pages, chunk_size):
remainder = chunk_size
chunk_list = []
def page_chunks(page, chunk_size, remainder):
array_keys = ['seq_num', 'time', 'uid']
page_size = len(page['uid'])
chunks = [(0,remainder)]
chunks.extend([(i, i + chunk_size) for i
in range(remainder, page_size, chunk_size)])
for start, stop in chunks:
yield {'descriptor': page['descriptor'],
**{key: page[key][start:stop] for key in array_keys},
'data': {key: page['data'][key][start:stop]
for key in page['data'].keys()},
'timestamps': {key: page['timestamps'][key][start: stop]
for key in page['timestamps'].keys()},
'filled': {key: page['filled'][key][start:stop]
for key in page['data'].keys()}}
for page in event_pages:
new_chunks = page_chunks(page, chunk_size, remainder)
for chunk in new_chunks:
remainder -= len(chunk['uid'])
chunk_list.append(chunk)
if remainder == 0:
yield merge_event_pages(chunk_list)
remainder = chunk_size
chunk_list = []
if chunk_list:
yield merge_event_pages(chunk_list)
def event_page_gen(page_size, num_pages):
data_keys = ['x','y','z']
array_keys = ['seq_num', 'time', 'uid']
for i in range(num_pages):
yield {'descriptor': 'DESCRIPTOR',
**{key: list(range(page_size)) for key in array_keys},
'data': {key: list(range(page_size)) for key in data_keys},
'timestamps':{key: list(range(page_size)) for key in data_keys},
'filled': {key: list(range(page_size)) for key in data_keys}}
pages = list(event_page_gen(5, 5))
print(pages)
print(list(rechunk_event_pages(pages, 5)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment