Created
May 15, 2023 10:16
-
-
Save edvardm/bea1cde8d48cb648b4a579154a2a8647 to your computer and use it in GitHub Desktop.
add items one at a time, buffering entries and flushing those automatically when there are enough entries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, TypeVar, Callable, Generator | |
from contextlib import contextmanager | |
T = TypeVar("T") | |
class BufferedOp: | |
"""Class for adding elements to a _buffer and flushing when full. | |
Note that last batch may is not usually flushed automatically as usually | |
the last batch is smaller than the _buffer size. Hence it's recommended to use | |
context manager `buffering()` below which automatically calls `_flush()` in the end | |
""" | |
def __init__(self, buffer_size: int, flusher: Callable[[list[T]], Any]): | |
self.buffer_size = buffer_size | |
self._buffer: list[T] = [] | |
self._flush = flusher | |
def add(self, element: T) -> None: | |
"""Add element to _buffer, automatically flushing _buffer when full.""" | |
self._buffer.append(element) | |
if len(self._buffer) >= self.buffer_size: | |
self.flush() | |
def flush(self) -> None: | |
"""Flush and reset _buffer""" | |
self._flush(self._buffer) | |
self._buffer = [] | |
@contextmanager | |
def buffering( | |
flusher: Callable[[list[T]], None], buffer_size: int | |
) -> Generator[BufferedOp, None, None]: | |
buf = BufferedOp(buffer_size, flusher) | |
try: | |
yield buf | |
finally: | |
buf.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment