Last active
July 18, 2017 14:10
-
-
Save edvardm/05d888b7cc9343de34169ca82c7f18a8 to your computer and use it in GitHub Desktop.
Simple buffer for running arbitrary operations in batches
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import TypeVar, List, Callable | |
import logging | |
from datetime import datetime | |
T = TypeVar('T') | |
Inserter = Callable[[List[T]], None] | |
class Buffer: | |
"""Buffer items until either max_size or max_time has been reached. | |
params: | |
inserter: function that takes a list of items and actually does | |
something with them. It is called automatically with currently | |
buffered items when conditions for flushing are met. | |
""" | |
def __init__(self, inserter: Inserter, max_size: int = 100, max_time: int = 300, log_level='INFO') -> None: | |
self.inserter = inserter | |
self.max_size = max_size | |
self.max_time = max_time | |
self.items = [] # type: List[T] | |
self.logger = logging.getLogger('Buffer') | |
self.logger.setLevel(log_level.upper()) | |
# initialize to current date so that we don't need to handle Nones | |
# separately | |
self.last_flushed = self._now() | |
self.logger.debug("Buffer created with max_size %d, max_time %ds", max_size, max_time) | |
def add(self, item: T) -> None: | |
self.items.append(item) | |
if self._needs_flush(): | |
self.logger.debug("Max time/size exceeded, flushing") | |
self.flush() | |
def _needs_flush(self) -> bool: | |
delta = self._now() - self.last_flushed | |
return len(self.items) >= self.max_size or delta.total_seconds() >= self.max_time | |
@staticmethod | |
def _now(): | |
return datetime.now() | |
def flush(self): | |
self.logger.debug("Flushing %d items", len(self.items)) | |
self.inserter(self.items) | |
self.last_flushed = self._now() | |
self.items = [] | |
# Usage: | |
# | |
# buf = Buffer(inserter) | |
# | |
# for i in iterable: | |
# buf.add(i) | |
# buf.flush() # flush in the end to ensure remaining items are processed | |
# | |
# `inserter` should be a function that takes a single argument, a list, and does whatever you want to do | |
# with the list. Buffer ensures the list is at most `max_items` large or at most `max_seconds` has elapsed before | |
# passing it to `inserter`. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment