Last active
July 25, 2023 23:29
-
-
Save sug0/45a84593f3b6a2e4d7077a17969e481e to your computer and use it in GitHub Desktop.
Lock-free Rust log entries with concurrent readers and writers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![allow(dead_code)] | |
use std::cell::UnsafeCell; | |
use std::mem::MaybeUninit; | |
use std::ops::Drop; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::sync::Arc; | |
use arc_swap::ArcSwap; | |
pub type Entry = String; | |
pub struct Log { | |
head: ArcSwap<LogInner>, | |
} | |
pub struct Checkpoint { | |
slot_counter: usize, | |
log: LogInnerBorrow, | |
} | |
impl Checkpoint { | |
#[inline] | |
pub fn iter(&self) -> impl Iterator<Item = &Entry> { | |
let log = self.log.log(); | |
let max_index = std::cmp::min(self.slot_counter, log.slots.len()); | |
let slots = &log.slots[..max_index]; | |
slots.iter().map(|slot| slot.read()) | |
} | |
} | |
enum LogInnerBorrow { | |
Full(Arc<LogInner>), | |
Temp(arc_swap::Guard<Arc<LogInner>>), | |
} | |
impl LogInnerBorrow { | |
#[inline] | |
fn log(&self) -> &LogInner { | |
match self { | |
LogInnerBorrow::Full(log) => log, | |
LogInnerBorrow::Temp(log) => log, | |
} | |
} | |
#[inline] | |
fn load_counter(&self) -> usize { | |
self.log().slot_counter.load(Ordering::Acquire) | |
} | |
#[inline] | |
fn allocate_slot(&self) -> Option<&LogSlot> { | |
let log = self.log(); | |
let index = log.slot_counter.fetch_add(1, Ordering::AcqRel); | |
log.slots.get(index) | |
} | |
} | |
// can create another layer of indirection, log inner inner, | |
// since we only need to drop up to `slot_counter` elements. | |
// currently, we're running the destructor on all elements | |
struct LogInner { | |
slot_counter: AtomicUsize, | |
slots: Box<[LogSlot]>, | |
} | |
#[derive(Debug)] | |
pub struct LogSlot { | |
inner: UnsafeCell<LogSlotInner>, | |
} | |
impl LogSlot { | |
const fn empty() -> Self { | |
Self { | |
inner: UnsafeCell::new(LogSlotInner { | |
initialized: false, | |
entry: MaybeUninit::uninit(), | |
}), | |
} | |
} | |
fn write(&self, entry: Entry) { | |
let slot = unsafe { &mut *self.inner.get() }; | |
debug_assert!(!slot.initialized, "log slot has already been populated"); | |
slot.initialized = true; | |
slot.entry.write(entry); | |
} | |
fn read(&self) -> &Entry { | |
let slot = unsafe { &*self.inner.get() }; | |
debug_assert!(slot.initialized, "log slot has not been populated"); | |
unsafe { slot.entry.assume_init_ref() } | |
} | |
} | |
unsafe impl Send for LogSlot {} | |
unsafe impl Sync for LogSlot {} | |
#[derive(Debug)] | |
struct LogSlotInner { | |
initialized: bool, | |
entry: MaybeUninit<Entry>, | |
} | |
impl Drop for LogSlotInner { | |
fn drop(&mut self) { | |
if self.initialized { | |
unsafe { self.entry.assume_init_drop() } | |
} | |
} | |
} | |
impl Log { | |
/// Create a new [`Log`] that can hold `cap` elements | |
/// before having to rotate. | |
pub fn with_capacity(cap: usize) -> Self { | |
Self { | |
head: ArcSwap::new(Arc::new(LogInner { | |
slot_counter: AtomicUsize::new(0), | |
slots: { | |
let mut slots = Vec::with_capacity(cap); | |
for _ in 0..cap { | |
slots.push(LogSlot::empty()); | |
} | |
slots.into_boxed_slice() | |
}, | |
})), | |
} | |
} | |
pub fn insert(&self, entry: Entry) { | |
let log = self.borrow_fast(); | |
if let Some(slot) = log.allocate_slot() { | |
slot.write(entry); | |
} | |
// TODO: implement log rotation | |
} | |
pub fn checkpoint(&self, short_lived: bool) -> Checkpoint { | |
let log = if short_lived { | |
self.borrow_fast() | |
} else { | |
self.borrow_slow() | |
}; | |
let slot_counter = log.load_counter(); | |
Checkpoint { log, slot_counter } | |
} | |
#[inline] | |
fn borrow_fast(&self) -> LogInnerBorrow { | |
LogInnerBorrow::Temp(self.head.load()) | |
} | |
#[inline] | |
fn borrow_slow(&self) -> LogInnerBorrow { | |
LogInnerBorrow::Full(self.head.load_full()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod log; | |
fn main() { | |
let log = log::Log::with_capacity(4); | |
log.insert("ganda".to_owned()); | |
log.insert("cena".to_owned()); | |
let ck_0 = log.checkpoint(true); | |
log.insert("mano".to_owned()); | |
let ck_1 = log.checkpoint(true); | |
log.insert("de".to_owned()); | |
log.insert("broa".to_owned()); | |
log.insert("velho".to_owned()); | |
let ck_2 = log.checkpoint(true); | |
let checkpoints = [ck_0, ck_1, ck_2]; | |
for (i, ck) in checkpoints.into_iter().enumerate() { | |
println!("======="); | |
for entry in ck.iter() { | |
println!("ck_{i}:{entry:?}"); | |
} | |
} | |
println!("======="); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment