Skip to content

Instantly share code, notes, and snippets.

@tobz
Created August 31, 2021 21:53
Show Gist options
  • Save tobz/39f882373fbd5193f246bef2f2963a66 to your computer and use it in GitHub Desktop.
Save tobz/39f882373fbd5193f246bef2f2963a66 to your computer and use it in GitHub Desktop.
struct Writer {
db: Arc<Database>,
reader_wakeup: Arc<AtomicWaker>,
writer_wakeup: Arc<Mutex<Vec<Waker>>>,
total_size: Arc<AtomicUsize>,
}
struct Reader {
db: Arc<Database>,
reader_wakeup: Arc<AtomicWaker>,
writer_wakeup: Arc<Mutex<Vec<AtomicWaker>>>,
temporary_buffer: Vec<Item>,
}
impl Sink for Writer {
fn poll_ready(...) {
if total_size > max_size {
self.writer_wakeup.lock().push(cx.waker().clone());
Pending
} else {
Ready
}
}
fn start_send(...) {
// add to batch
}
fn poll_flush(...) {
self.db.write_and_flush();
self.read_wakeup.wake();
}
}
impl Stream for Reader {
fn poll_next(...) {
self.reader_wakeup.register(cx.waker());
if time_to_delete_items_weve_received {
// delete items we've already seen, and notify writers they can write again
for wake in self.writer_wakeup.lock().drain(..) {
wake.wake();
}
}
if self.temporary_buffer.is_empty() {
block_in_place(|| self.temporary_buffer.extend(self.db.blocking_read()));
}
if let Some(item) = self.temporary_buffer.pop() {
return Ready(item)
} else {
Pending
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment