Skip to content

Instantly share code, notes, and snippets.

@xqq
Created January 13, 2025 06:05
Show Gist options
  • Save xqq/b84dec89d8dd7a22e197d37b6a7e8e9f to your computer and use it in GitHub Desktop.
Save xqq/b84dec89d8dd7a22e197d37b6a7e8e9f to your computer and use it in GitHub Desktop.
use async_condvar_fair::{Condvar, BatonExt};
use bytes::{BufMut, Bytes, BytesMut};
use std::collections::VecDeque;
use std::ops::DerefMut;
use std::sync::Arc;
use tokio::sync::Mutex;
/// Create an unbounded channel with the specified minimum buffering size, in bytes.
pub fn buffered_bytes_channel(threshold_size: usize) -> (Sender, Receiver) {
let shared = Arc::new((Mutex::new(Default::default()), Condvar::new()));
(
Sender {
local: Default::default(),
shared: Arc::clone(&shared),
threshold_size: threshold_size,
},
Receiver {
local: Default::default(),
shared,
},
)
}
type Shared<T> = Arc<(Mutex<VecDeque<T>>, Condvar)>;
// ###### SENDER ###############################################################
/// The sending side of the channel.
///
/// `Sender` is `Send`, `Sync`, and `Clone`able.
///
/// When `Sender` is dropped, any locally buffered messages are flushed to the receiver.
/// Note that this _might_ block if waiting on the shared lock.
pub struct Sender {
local: Vec<Bytes>,
shared: Shared<Bytes>,
threshold_size: usize,
}
impl Clone for Sender {
fn clone(&self) -> Self {
Self {
local: Default::default(),
shared: Arc::clone(&self.shared),
threshold_size: self.threshold_size,
}
}
}
impl Sender {
/// Buffer a message to be sent.
/// If the local buffer reaches the threshold, an attempt to send the buffered messages to the
/// receiver occurs.
///
/// This is guaranteed not to block.
pub fn send(&mut self, bytes: Bytes) {
// empty Bytes means an EOF flag, mark it
let chunk_is_empty = bytes.is_empty();
// push onto buffer if not EOF flag
if !chunk_is_empty {
self.local.push(bytes)
}
// if [not EOF flag] && [accumulated buffer size is less than threshold]
// return and wait for next call
if !chunk_is_empty && Self::total_byte_length(&self.local) < self.threshold_size {
return; // haven't reached threshold
}
// Merge several Bytes element into one
if self.local.len() > 1 { Self::merge_vec_bytes(&mut self.local) };
// if the item is an EOF flag, push back into local
if chunk_is_empty {
self.local.push(Bytes::new());
}
let (mutex, condvar) = self.shared.as_ref();
if let Ok(mut queue) = mutex.try_lock() {
// if we manage to get a lock, extend the queue with our buffered items
queue.extend(self.local.drain(..));
// notify that the queue has been updated
condvar.notify_one();
}
}
fn total_byte_length(vec: &Vec<Bytes>) -> usize {
vec.iter()
.map(|b| b.len())
.sum()
}
fn merge_vec_bytes(vec: &mut Vec<Bytes>) {
let mut big_buffer = BytesMut::with_capacity(Self::total_byte_length(vec));
for buf in vec.drain(..) {
big_buffer.put(buf);
}
vec.push(big_buffer.freeze());
}
/// Flush any locally buffered items to the receiver.
///
pub async fn flush(&mut self) {
if self.local.is_empty() {
return; // no buffered items, can exit
}
// Merge several Bytes element into one
if self.local.len() > 1 { Self::merge_vec_bytes(&mut self.local) };
let (mutex, condvar) = self.shared.as_ref();
// we have to wait until we get the queue
mutex.lock().await.extend(self.local.drain(..));
// notify that the queue has been updated
condvar.notify_one();
}
}
impl Drop for Sender {
fn drop(&mut self) {
self.shared.1.notify_one();
}
}
// ###### RECEIVER #############################################################
/// The receiving end of the channel.
///
/// There can be only _one_ receiver.
pub struct Receiver {
local: VecDeque<Bytes>,
shared: Shared<Bytes>,
}
impl Receiver {
/// Receive a message, return `None` is there are no more messages _or senders_.
///
/// Receive will block until a message becomes available or all associated [`Sender`]s are
/// dropped.
pub async fn recv(&mut self) -> Option<Bytes> {
// only check for messages if local buffer is empty
while self.local.is_empty() {
if Arc::strong_count(&self.shared) == 1 {
// there are no more senders, take the shared queue and break
// if both queues are empty, breaking will return None and signals
// end of channel
std::mem::swap(&mut self.local, self.shared.0.lock().await.deref_mut());
break;
} else {
let (mutex, condvar) = self.shared.as_ref();
// obtain a lock on the shared queue
let mut guard = mutex.lock().await;
let mut baton = None;
while guard.is_empty() {
baton.dispose();
// if queue is empty, unlock it and wait for a sender to notify
// that it has been updated
let got = condvar.wait_baton((guard, mutex)).await;
guard = got.0;
baton = got.1;
}
// our local buffer is empty, so we can swap the shared one for it
std::mem::swap(&mut self.local, &mut guard);
}
}
// pull from the local buffer
self.local.pop_front()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment