Created
          January 13, 2025 06:05 
        
      - 
      
- 
        Save xqq/b84dec89d8dd7a22e197d37b6a7e8e9f to your computer and use it in GitHub Desktop. 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | use async_condvar_fair::{Condvar, BatonExt}; | |
| use bytes::{BufMut, Bytes, BytesMut}; | |
| use std::collections::VecDeque; | |
| use std::ops::DerefMut; | |
| use std::sync::Arc; | |
| use tokio::sync::Mutex; | |
| /// Create an unbounded channel with the specified minimum buffering size, in bytes. | |
| pub fn buffered_bytes_channel(threshold_size: usize) -> (Sender, Receiver) { | |
| let shared = Arc::new((Mutex::new(Default::default()), Condvar::new())); | |
| ( | |
| Sender { | |
| local: Default::default(), | |
| shared: Arc::clone(&shared), | |
| threshold_size: threshold_size, | |
| }, | |
| Receiver { | |
| local: Default::default(), | |
| shared, | |
| }, | |
| ) | |
| } | |
| type Shared<T> = Arc<(Mutex<VecDeque<T>>, Condvar)>; | |
| // ###### SENDER ############################################################### | |
| /// The sending side of the channel. | |
| /// | |
| /// `Sender` is `Send`, `Sync`, and `Clone`able. | |
| /// | |
| /// When `Sender` is dropped, any locally buffered messages are flushed to the receiver. | |
| /// Note that this _might_ block if waiting on the shared lock. | |
| pub struct Sender { | |
| local: Vec<Bytes>, | |
| shared: Shared<Bytes>, | |
| threshold_size: usize, | |
| } | |
| impl Clone for Sender { | |
| fn clone(&self) -> Self { | |
| Self { | |
| local: Default::default(), | |
| shared: Arc::clone(&self.shared), | |
| threshold_size: self.threshold_size, | |
| } | |
| } | |
| } | |
| impl Sender { | |
| /// Buffer a message to be sent. | |
| /// If the local buffer reaches the threshold, an attempt to send the buffered messages to the | |
| /// receiver occurs. | |
| /// | |
| /// This is guaranteed not to block. | |
| pub fn send(&mut self, bytes: Bytes) { | |
| // empty Bytes means an EOF flag, mark it | |
| let chunk_is_empty = bytes.is_empty(); | |
| // push onto buffer if not EOF flag | |
| if !chunk_is_empty { | |
| self.local.push(bytes) | |
| } | |
| // if [not EOF flag] && [accumulated buffer size is less than threshold] | |
| // return and wait for next call | |
| if !chunk_is_empty && Self::total_byte_length(&self.local) < self.threshold_size { | |
| return; // haven't reached threshold | |
| } | |
| // Merge several Bytes element into one | |
| if self.local.len() > 1 { Self::merge_vec_bytes(&mut self.local) }; | |
| // if the item is an EOF flag, push back into local | |
| if chunk_is_empty { | |
| self.local.push(Bytes::new()); | |
| } | |
| let (mutex, condvar) = self.shared.as_ref(); | |
| if let Ok(mut queue) = mutex.try_lock() { | |
| // if we manage to get a lock, extend the queue with our buffered items | |
| queue.extend(self.local.drain(..)); | |
| // notify that the queue has been updated | |
| condvar.notify_one(); | |
| } | |
| } | |
| fn total_byte_length(vec: &Vec<Bytes>) -> usize { | |
| vec.iter() | |
| .map(|b| b.len()) | |
| .sum() | |
| } | |
| fn merge_vec_bytes(vec: &mut Vec<Bytes>) { | |
| let mut big_buffer = BytesMut::with_capacity(Self::total_byte_length(vec)); | |
| for buf in vec.drain(..) { | |
| big_buffer.put(buf); | |
| } | |
| vec.push(big_buffer.freeze()); | |
| } | |
| /// Flush any locally buffered items to the receiver. | |
| /// | |
| pub async fn flush(&mut self) { | |
| if self.local.is_empty() { | |
| return; // no buffered items, can exit | |
| } | |
| // Merge several Bytes element into one | |
| if self.local.len() > 1 { Self::merge_vec_bytes(&mut self.local) }; | |
| let (mutex, condvar) = self.shared.as_ref(); | |
| // we have to wait until we get the queue | |
| mutex.lock().await.extend(self.local.drain(..)); | |
| // notify that the queue has been updated | |
| condvar.notify_one(); | |
| } | |
| } | |
| impl Drop for Sender { | |
| fn drop(&mut self) { | |
| self.shared.1.notify_one(); | |
| } | |
| } | |
| // ###### RECEIVER ############################################################# | |
| /// The receiving end of the channel. | |
| /// | |
| /// There can be only _one_ receiver. | |
| pub struct Receiver { | |
| local: VecDeque<Bytes>, | |
| shared: Shared<Bytes>, | |
| } | |
| impl Receiver { | |
| /// Receive a message, return `None` is there are no more messages _or senders_. | |
| /// | |
| /// Receive will block until a message becomes available or all associated [`Sender`]s are | |
| /// dropped. | |
| pub async fn recv(&mut self) -> Option<Bytes> { | |
| // only check for messages if local buffer is empty | |
| while self.local.is_empty() { | |
| if Arc::strong_count(&self.shared) == 1 { | |
| // there are no more senders, take the shared queue and break | |
| // if both queues are empty, breaking will return None and signals | |
| // end of channel | |
| std::mem::swap(&mut self.local, self.shared.0.lock().await.deref_mut()); | |
| break; | |
| } else { | |
| let (mutex, condvar) = self.shared.as_ref(); | |
| // obtain a lock on the shared queue | |
| let mut guard = mutex.lock().await; | |
| let mut baton = None; | |
| while guard.is_empty() { | |
| baton.dispose(); | |
| // if queue is empty, unlock it and wait for a sender to notify | |
| // that it has been updated | |
| let got = condvar.wait_baton((guard, mutex)).await; | |
| guard = got.0; | |
| baton = got.1; | |
| } | |
| // our local buffer is empty, so we can swap the shared one for it | |
| std::mem::swap(&mut self.local, &mut guard); | |
| } | |
| } | |
| // pull from the local buffer | |
| self.local.pop_front() | |
| } | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment