Last active
August 24, 2021 20:23
-
-
Save tobz/75584cd2134fd0cc1f6999168a51e201 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Strategy for partitioning events. | |
pub trait Partitioner { | |
type Item: ByteSizeOf; | |
type Key: Clone + Eq + Hash; | |
fn partition(&self, item: &Self::Item) -> Self::Key; | |
} | |
/// An in-progress batch for `PartitionBatch`. | |
/// | |
/// Handles enforcing batch size limits (total size and total number of events) as well as | |
/// coalsescing event finalizers for the overall batch. | |
pub struct PartitionInFlightBatch<P> | |
where | |
P: Partitioner, | |
{ | |
items: Vec<P::Item>, | |
finalizers: EventFinalizers, | |
total_size: usize, | |
size: BatchSize<()>, | |
_partitioner: PhantomData<P> | |
} | |
impl<P> PartitionInFlightBatch<P> | |
where | |
P: Partitioner, | |
P::Item: ByteSizeOf, | |
{ | |
pub fn new(size: BatchSize<()>) -> Self { | |
Self { | |
items: Vec::new(), | |
finalizers: EventFinalizers::default(), | |
total_size: 0, | |
size, | |
_partitioner: PhantomData, | |
} | |
} | |
pub fn is_full(&self) -> bool { | |
self.items.len() == self.size.events | |
} | |
pub fn push(&mut self, item: EncodedEvent<P::Item>) -> PushResult<EncodedEvent<P::Item>> { | |
// Don't overrun our batch size in bytes. | |
let item_size = item.item.allocated_bytes(); | |
if self.total_size + item_size > self.size.bytes { | |
return PushResult::Overflow(item) | |
} | |
// Don't overrun our batch size in events. | |
let item_limit = self.size.events; | |
let current_items = self.items.len(); | |
if current_items == item_limit { | |
return PushResult::Overflow(item) | |
} | |
// Add the item to the batch, and do the necessary accounting. | |
let EncodedEvent { item, finalizers } = item; | |
self.items.push(item); | |
self.finalizers.merge(finalizers); | |
self.total_size += item_size; | |
PushResult::Ok(current_items + 1 == item_limit) | |
} | |
pub fn finish(self, key: P::Key) -> PartitionFinishedBatch<P> { | |
PartitionFinishedBatch { | |
key, | |
items: self.items, | |
total_size: self.total_size, | |
finalizers: self.finalizers, | |
_partitioner: PhantomData, | |
} | |
} | |
} | |
/// A complete partition batch. | |
pub struct PartitionFinishedBatch<P> | |
where | |
P: Partitioner, | |
{ | |
key: P::Key, | |
items: Vec<P::Item>, | |
total_size: usize, | |
finalizers: EventFinalizers, | |
_partitioner: PhantomData<P> | |
} | |
/// Batteries-included partitioning batcher. | |
/// | |
/// Provides simple batching of eventgs based on user-defined partitioning. In addition, batching | |
/// can be coinfigured in both time and space. Finalization of events is provided as a first-class citizen. | |
pub struct PartitionBatcher<P> | |
where | |
P: Partitioner, | |
{ | |
partitioner: P, | |
settings: BatchSettings<()>, | |
timeout_queue: DelayQueue<P::Key>, | |
batches: HashMap<P::Key, PartitionInFlightBatch<P>>, | |
} | |
impl<P> PartitionBatcher<P> | |
where | |
P: Partitioner, | |
{ | |
pub fn new(partitioner: P, settings: BatchSettings<()>) -> Self { | |
PartitionBatcher { | |
partitioner, | |
settings, | |
timeout_queue: DelayQueue::new(), | |
batches: HashMap::new(), | |
} | |
} | |
pub fn push(&mut self, event: EncodedEvent<P::Item>) -> PushResult<EncodedEvent<P::Item>> { | |
let pk = self.partitioner.partition(&event.item); | |
let mut batch = self.batches.entry(pk).or_insert_with(|| PartitionInFlightBatch::new(self.settings.size.clone())); | |
batch.push(event) | |
} | |
pub async fn get_ready_batches(&mut self) -> Vec<PartitionFinishedBatch<P>> { | |
let mut batches = Vec::new(); | |
// Check to see if any batches are full and need to be flushed out. | |
let mut ready_partitions = self.batches.iter() | |
.filter_map(|(pk, b)| if b.is_full() { Some(pk.clone()) } else { None }) | |
.collect::<Vec<_>>(); | |
// Check to see if any batches have expired, indicating a need for them to be flushed. We | |
// explicitly use the `poll!` macro to poll the delay queue, which holds all batch | |
// expirations. We do this so that we don't actually wait until the next batch has expired, | |
// which might block the task from accepting more items. However, this differs from | |
// `FutureExt::now_and_never` in that `poll!` ensures this task context is properly attached | |
// so that the next batch expiration wakes us up. | |
while let Poll::Ready(Some(Ok(pk))) = poll!(self.timeout_queue.next()) { | |
let pk = pk.into_inner(); | |
ready_partitions.push(pk); | |
} | |
for pk in ready_partitions { | |
let batch = self.batches.remove(&pk).expect("batch must always exist"); | |
batches.push(batch.finish(pk)); | |
} | |
batches | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment