Skip to content

Instantly share code, notes, and snippets.

@sug0
Last active July 25, 2023 23:29
Show Gist options
  • Save sug0/45a84593f3b6a2e4d7077a17969e481e to your computer and use it in GitHub Desktop.
Save sug0/45a84593f3b6a2e4d7077a17969e481e to your computer and use it in GitHub Desktop.
Lock-free Rust log entries with concurrent readers and writers
#![allow(dead_code)]
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ops::Drop;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use arc_swap::ArcSwap;
pub type Entry = String;
pub struct Log {
head: ArcSwap<LogInner>,
}
pub struct Checkpoint {
slot_counter: usize,
log: LogInnerBorrow,
}
impl Checkpoint {
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &Entry> {
let log = self.log.log();
let max_index = std::cmp::min(self.slot_counter, log.slots.len());
let slots = &log.slots[..max_index];
slots.iter().map(|slot| slot.read())
}
}
enum LogInnerBorrow {
Full(Arc<LogInner>),
Temp(arc_swap::Guard<Arc<LogInner>>),
}
impl LogInnerBorrow {
#[inline]
fn log(&self) -> &LogInner {
match self {
LogInnerBorrow::Full(log) => log,
LogInnerBorrow::Temp(log) => log,
}
}
#[inline]
fn load_counter(&self) -> usize {
self.log().slot_counter.load(Ordering::Acquire)
}
#[inline]
fn allocate_slot(&self) -> Option<&LogSlot> {
let log = self.log();
let index = log.slot_counter.fetch_add(1, Ordering::AcqRel);
log.slots.get(index)
}
}
// can create another layer of indirection, log inner inner,
// since we only need to drop up to `slot_counter` elements.
// currently, we're running the destructor on all elements
struct LogInner {
slot_counter: AtomicUsize,
slots: Box<[LogSlot]>,
}
#[derive(Debug)]
pub struct LogSlot {
inner: UnsafeCell<LogSlotInner>,
}
impl LogSlot {
const fn empty() -> Self {
Self {
inner: UnsafeCell::new(LogSlotInner {
initialized: false,
entry: MaybeUninit::uninit(),
}),
}
}
fn write(&self, entry: Entry) {
let slot = unsafe { &mut *self.inner.get() };
debug_assert!(!slot.initialized, "log slot has already been populated");
slot.initialized = true;
slot.entry.write(entry);
}
fn read(&self) -> &Entry {
let slot = unsafe { &*self.inner.get() };
debug_assert!(slot.initialized, "log slot has not been populated");
unsafe { slot.entry.assume_init_ref() }
}
}
unsafe impl Send for LogSlot {}
unsafe impl Sync for LogSlot {}
#[derive(Debug)]
struct LogSlotInner {
initialized: bool,
entry: MaybeUninit<Entry>,
}
impl Drop for LogSlotInner {
fn drop(&mut self) {
if self.initialized {
unsafe { self.entry.assume_init_drop() }
}
}
}
impl Log {
/// Create a new [`Log`] that can hold `cap` elements
/// before having to rotate.
pub fn with_capacity(cap: usize) -> Self {
Self {
head: ArcSwap::new(Arc::new(LogInner {
slot_counter: AtomicUsize::new(0),
slots: {
let mut slots = Vec::with_capacity(cap);
for _ in 0..cap {
slots.push(LogSlot::empty());
}
slots.into_boxed_slice()
},
})),
}
}
pub fn insert(&self, entry: Entry) {
let log = self.borrow_fast();
if let Some(slot) = log.allocate_slot() {
slot.write(entry);
}
// TODO: implement log rotation
}
pub fn checkpoint(&self, short_lived: bool) -> Checkpoint {
let log = if short_lived {
self.borrow_fast()
} else {
self.borrow_slow()
};
let slot_counter = log.load_counter();
Checkpoint { log, slot_counter }
}
#[inline]
fn borrow_fast(&self) -> LogInnerBorrow {
LogInnerBorrow::Temp(self.head.load())
}
#[inline]
fn borrow_slow(&self) -> LogInnerBorrow {
LogInnerBorrow::Full(self.head.load_full())
}
}
mod log;
fn main() {
let log = log::Log::with_capacity(4);
log.insert("ganda".to_owned());
log.insert("cena".to_owned());
let ck_0 = log.checkpoint(true);
log.insert("mano".to_owned());
let ck_1 = log.checkpoint(true);
log.insert("de".to_owned());
log.insert("broa".to_owned());
log.insert("velho".to_owned());
let ck_2 = log.checkpoint(true);
let checkpoints = [ck_0, ck_1, ck_2];
for (i, ck) in checkpoints.into_iter().enumerate() {
println!("=======");
for entry in ck.iter() {
println!("ck_{i}:{entry:?}");
}
}
println!("=======");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment