Skip to content

Instantly share code, notes, and snippets.

@tobz
Last active August 24, 2021 20:23
Show Gist options
  • Save tobz/75584cd2134fd0cc1f6999168a51e201 to your computer and use it in GitHub Desktop.
Save tobz/75584cd2134fd0cc1f6999168a51e201 to your computer and use it in GitHub Desktop.
// Strategy for partitioning events.
pub trait Partitioner {
type Item: ByteSizeOf;
type Key: Clone + Eq + Hash;
fn partition(&self, item: &Self::Item) -> Self::Key;
}
/// An in-progress batch for `PartitionBatch`.
///
/// Handles enforcing batch size limits (total size and total number of events) as well as
/// coalsescing event finalizers for the overall batch.
pub struct PartitionInFlightBatch<P>
where
P: Partitioner,
{
items: Vec<P::Item>,
finalizers: EventFinalizers,
total_size: usize,
size: BatchSize<()>,
_partitioner: PhantomData<P>
}
impl<P> PartitionInFlightBatch<P>
where
P: Partitioner,
P::Item: ByteSizeOf,
{
pub fn new(size: BatchSize<()>) -> Self {
Self {
items: Vec::new(),
finalizers: EventFinalizers::default(),
total_size: 0,
size,
_partitioner: PhantomData,
}
}
pub fn is_full(&self) -> bool {
self.items.len() == self.size.events
}
pub fn push(&mut self, item: EncodedEvent<P::Item>) -> PushResult<EncodedEvent<P::Item>> {
// Don't overrun our batch size in bytes.
let item_size = item.item.allocated_bytes();
if self.total_size + item_size > self.size.bytes {
return PushResult::Overflow(item)
}
// Don't overrun our batch size in events.
let item_limit = self.size.events;
let current_items = self.items.len();
if current_items == item_limit {
return PushResult::Overflow(item)
}
// Add the item to the batch, and do the necessary accounting.
let EncodedEvent { item, finalizers } = item;
self.items.push(item);
self.finalizers.merge(finalizers);
self.total_size += item_size;
PushResult::Ok(current_items + 1 == item_limit)
}
pub fn finish(self, key: P::Key) -> PartitionFinishedBatch<P> {
PartitionFinishedBatch {
key,
items: self.items,
total_size: self.total_size,
finalizers: self.finalizers,
_partitioner: PhantomData,
}
}
}
/// A complete partition batch.
pub struct PartitionFinishedBatch<P>
where
P: Partitioner,
{
key: P::Key,
items: Vec<P::Item>,
total_size: usize,
finalizers: EventFinalizers,
_partitioner: PhantomData<P>
}
/// Batteries-included partitioning batcher.
///
/// Provides simple batching of eventgs based on user-defined partitioning. In addition, batching
/// can be coinfigured in both time and space. Finalization of events is provided as a first-class citizen.
pub struct PartitionBatcher<P>
where
P: Partitioner,
{
partitioner: P,
settings: BatchSettings<()>,
timeout_queue: DelayQueue<P::Key>,
batches: HashMap<P::Key, PartitionInFlightBatch<P>>,
}
impl<P> PartitionBatcher<P>
where
P: Partitioner,
{
pub fn new(partitioner: P, settings: BatchSettings<()>) -> Self {
PartitionBatcher {
partitioner,
settings,
timeout_queue: DelayQueue::new(),
batches: HashMap::new(),
}
}
pub fn push(&mut self, event: EncodedEvent<P::Item>) -> PushResult<EncodedEvent<P::Item>> {
let pk = self.partitioner.partition(&event.item);
let mut batch = self.batches.entry(pk).or_insert_with(|| PartitionInFlightBatch::new(self.settings.size.clone()));
batch.push(event)
}
pub async fn get_ready_batches(&mut self) -> Vec<PartitionFinishedBatch<P>> {
let mut batches = Vec::new();
// Check to see if any batches are full and need to be flushed out.
let mut ready_partitions = self.batches.iter()
.filter_map(|(pk, b)| if b.is_full() { Some(pk.clone()) } else { None })
.collect::<Vec<_>>();
// Check to see if any batches have expired, indicating a need for them to be flushed. We
// explicitly use the `poll!` macro to poll the delay queue, which holds all batch
// expirations. We do this so that we don't actually wait until the next batch has expired,
// which might block the task from accepting more items. However, this differs from
// `FutureExt::now_and_never` in that `poll!` ensures this task context is properly attached
// so that the next batch expiration wakes us up.
while let Poll::Ready(Some(Ok(pk))) = poll!(self.timeout_queue.next()) {
let pk = pk.into_inner();
ready_partitions.push(pk);
}
for pk in ready_partitions {
let batch = self.batches.remove(&pk).expect("batch must always exist");
batches.push(batch.finish(pk));
}
batches
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment