Skip to content

Instantly share code, notes, and snippets.

@signalpillar
Created December 28, 2015 21:44
Show Gist options
  • Select an option

  • Save signalpillar/134d1ff43d97de361190 to your computer and use it in GitHub Desktop.

Select an option

Save signalpillar/134d1ff43d97de361190 to your computer and use it in GitHub Desktop.
def call_on_full_list_buffer(fn, buffer_size):
buffer = _call_on_full_list_buffer(fn, buffer_size)
next(buffer)
return buffer
def _call_on_full_list_buffer(fn, buffer_size):
buffer = []
values = []
while True:
if not values:
values = yield
if len(buffer) + len(values) < buffer_size:
buffer.extend(values)
values.clear()
else:
chunk_end = buffer_size - len(buffer)
for i in range(chunk_end):
buffer.append(values[i])
fn(buffer)
# lose reference to the previous buffer list as `fn` usually has side effects
buffer = []
values = values[chunk_end:]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment