Skip to content

Instantly share code, notes, and snippets.

@tobz
Created September 18, 2021 20:59
Show Gist options
  • Save tobz/e5cb4a3ccb7f4d5405ee5e3803b3b08f to your computer and use it in GitHub Desktop.
Save tobz/e5cb4a3ccb7f4d5405ee5e3803b3b08f to your computer and use it in GitHub Desktop.
//! Design specification for our yet-to-be-named SPSC disk buffer implementation:
//!
//! We provide a single writer/single reader interface to an underlying set of files that
//! conceptually represent a ring buffer. Unlike a typical ring buffer, we block writes when the
//! total size of all unread records reaches the configured limit. It may be possible to alter the
//! design in the future such that we can provide a "drop oldest" operation mode, but that is
//! out-of-scope for version 1 of this design.
//!
//! Design constraints / invariants:
//! - buffer can be a maximum of 2TB in total size
//! - data files do not exceed 128MB
//! - files are allocated upfront (each file's full 128MB is allocated when created)
//! - all headers (ledger, data file) are written in network byte order (big endian) when integers
//! are involved
//!
//! At a high-level, records that are written end up in one of many underlying data files, while the
//! ledgber file -- number of records, writer and reader positions, etc -- is stored in a separate
//! file. Data files function primarily with a "last process who touched it" ownership model: the
//! writer always creates new files, and the reader deletes files when they have been fully read.
//!
//! Internally, data files consist of a simplified structure that is optimized for the ring buffer
//! use case. Records are packed together with a minimalistic layout:
//!
//! buffer-data-001.dat:
//! [data file record count - unsigned 64-bit integer]
//! [data file size - unsigned 64-bit integer]
//! [record ID - unsigned 64-bit integer]
//! [record length - unsigned 64-bit integer]
//! [record data - N bytes]
//!
//! The record ID/length/data superblocks repeat infinitely until adding another would exceed the
//! configured data file size limit, in which case a new data file is started. A record cannot
//! exceed the maximum size of a data file. Attempting to buffer such a record will result in an error.
//!
//! Likewise, the bookkeeping file consists of a simplified structure that is optimized for being
//! shared via a memory-mapped file interface between the writer and reader:
//!
//! buffer.db:
//! [total record count - unsigned 64-bit integer]
//! [total buffer size - unsigned 64-bit integer]
//! [next record ID - unsigned 64-bit integer]
//! [writer current data file ID - unsigned 16-bit integer]
//! [reader current data file ID - unsigned 16-bit integer]
//! [reader last record ID - unsigned 64-bit integer]
//!
//! As this buffer is meant to emulate a ring buffer, most of the bookkeeping resolves around the
//! writer and reader being able to quickly figure out where they left off. Record and data file
//! IDs are simply rolled over when they reach the maximum of their data type, and are incremented
//! indiscriminately rather than reused if one is retired within the 0 - N range.
//!
//! TODO: think through whether or not we can use total file size to ensure that we never try to
//! open more than 4096 files (2TB max buffer size / 256MB max data file size) total, so that we can
//! avoid needing an array/bitmap/etc tracking which files are in use.
use std::{convert::TryInto, default, fmt::Write, fs::OpenOptions, io, mem, ops::Deref, path::{Path, PathBuf}, ptr, slice, sync::{Arc, atomic::{AtomicU16, AtomicU64, Ordering}}, thread, time::Duration};
use memmap2::{MmapMut, MmapOptions};
use parking_lot::{Mutex, MutexGuard};
const LEDGER_FILE_SIZE: usize = 36;
const DATA_FILE_SIZE: usize = 64 * 1024 * 1024;
const DATA_FILE_HEADER_SIZE: usize = 16;
const DATA_FILE_RECORD_DATA_MAX_USABLE: usize = DATA_FILE_SIZE - DATA_FILE_HEADER_SIZE;
const DATA_FILE_RECORD_HEADER_SIZE: usize = 16;
const DATA_FILE_MAX_RECORD_SIZE: usize = DATA_FILE_SIZE - DATA_FILE_HEADER_SIZE - DATA_FILE_RECORD_HEADER_SIZE;
#[derive(Debug, Default)]
struct LedgerState {
// Total number of records persisted in this buffer.
total_records: AtomicU64,
// Total size of all data files used by this buffer.
total_buffer_size: AtomicU64,
// Next record ID to use when writing a record.
next_record_id: AtomicU64,
// The current data file ID being written to.
writer_current_data_file_id: AtomicU16,
// The current data file ID being read from.
reader_current_data_file_id: AtomicU16,
// The last record ID read by the reader.
reader_last_record_id: AtomicU64,
}
impl LedgerState {
pub fn allocate_record_id(&self) -> u64 {
self.next_record_id.fetch_add(1, Ordering::Acquire)
}
/// Gets the current write file ID.
pub fn current_writer_file_id(&self) -> u16 {
self.writer_current_data_file_id.load(Ordering::Acquire)
}
/// Increments the current writer file ID.
pub fn increment_writer_file_id(&self) {
self.writer_current_data_file_id.fetch_add(1, Ordering::AcqRel);
}
pub fn serialize_to(&self, dst: &mut [u8]) {
// CLARITY TODO: This is very ugly, and fragile due to field offsets. It'd be nice if we
// had a macro or something to make this a little more programmatic/repeatable/machine
// checkable. Given that we only have three structs which we serialize in this fashion,
// though, that could be overkill.
//
// PERFORMANCE TODO: This is a nice, safe variant of pushing the state into the data file,
// but I'm not sure if doing a pointer-level `memcpy` action would be meaningfully faster.
let total_records = self.total_records.load(Ordering::SeqCst).to_be_bytes();
let total_buffer_size = self.total_buffer_size.load(Ordering::SeqCst).to_be_bytes();
let next_record_id = self.next_record_id.load(Ordering::SeqCst).to_be_bytes();
let writer_current_data_file_id = self.writer_current_data_file_id.load(Ordering::SeqCst).to_be_bytes();
let reader_current_data_file_id = self.reader_current_data_file_id.load(Ordering::SeqCst).to_be_bytes();
let reader_last_record_id = self.reader_last_record_id.load(Ordering::SeqCst).to_be_bytes();
let mut src = Vec::new();
src.extend_from_slice(&total_records[..]);
src.extend_from_slice(&total_buffer_size[..]);
src.extend_from_slice(&next_record_id[..]);
src.extend_from_slice(&writer_current_data_file_id[..]);
src.extend_from_slice(&reader_current_data_file_id[..]);
src.extend_from_slice(&reader_last_record_id[..]);
debug_assert!(dst.len() == LEDGER_FILE_SIZE);
debug_assert!(src.len() == LEDGER_FILE_SIZE);
dst.copy_from_slice(&src[..]);
}
pub fn deserialize_from(&mut self, src: &[u8]) {
// CLARITY TODO: This is very ugly, and fragile due to field offsets. It'd be nice if we
// had a macro or something to make this a little more programmatic/repeatable/machine
// checkable. Given that we only have three structs which we serialize in this fashion,
// though, that could be overkill.
debug_assert!(src.len() == LEDGER_FILE_SIZE);
self.total_records = src[..8].try_into()
.map(u64::from_be_bytes)
.map(AtomicU64::new)
.expect("should have had 8 bytes");
self.total_buffer_size = src[8..16].try_into()
.map(u64::from_be_bytes)
.map(AtomicU64::new)
.expect("should have had 8 bytes");
self.next_record_id = src[16..24].try_into()
.map(u64::from_be_bytes)
.map(AtomicU64::new)
.expect("should have had 8 bytes");
self.writer_current_data_file_id= src[24..26].try_into()
.map(u16::from_be_bytes)
.map(AtomicU16::new)
.expect("should have had 2 bytes");
self.reader_current_data_file_id = src[26..28].try_into()
.map(u16::from_be_bytes)
.map(AtomicU16::new)
.expect("should have had 2 bytes");
self.reader_last_record_id = src[28..36].try_into()
.map(u64::from_be_bytes)
.map(AtomicU64::new)
.expect("should have had 8 bytes");
}
}
#[derive(Debug)]
struct Ledger {
// Path to the data directory.
data_dir: PathBuf,
// Handle to the memory-mapped ledger file.
ledger_mmap: Mutex<MmapMut>,
// Ledger state.
state: LedgerState,
}
impl Ledger {
pub fn allocate_record_id(&self) -> u64 {
self.state.next_record_id.fetch_add(1, Ordering::Acquire)
}
pub fn get_writer_current_data_file_path(&self) -> PathBuf {
self.get_data_file_path(self.state.current_writer_file_id())
}
pub fn increment_writer_file_id(&self) {
self.state.increment_writer_file_id();
}
pub fn get_reader_current_data_file_path(&self) -> PathBuf {
self.get_data_file_path(self.state.reader_current_data_file_id.load(Ordering::Acquire))
}
pub fn get_data_file_path(&self, file_id: u16) -> PathBuf {
self.data_dir.join(format!("buffer-data-{}.dat", file_id))
}
pub fn state(&self) -> &LedgerState {
&self.state
}
pub fn data_dir(&self) -> &PathBuf {
&self.data_dir
}
fn load_inner(&mut self) -> io::Result<()> {
// TODO: this theoretically doesn't need to return a Result right now, let alone an
// io::Result, but at some point we should likely be adding checksums and doing other
// checks, so loading our state from disk would become a fallible operation
// INVARIANT: We always create the ledger file with a size of LEDGER_FILE_SIZE, which never
// changes over time. We can be sure that the slice we take to read the ledger state will
// be exactly LEDGER_FILE_SIZE bytes.
let ledger_mmap = self.ledger_mmap.lock();
let ledger_region = &ledger_mmap[..];
debug_assert_eq!(ledger_region.len(), LEDGER_FILE_SIZE);
self.state.deserialize_from(ledger_region);
Ok(())
}
pub fn track_write(&self, bytes_written: u64) {
self.state.total_records.fetch_add(1, Ordering::Release);
self.state.total_buffer_size.fetch_add(bytes_written, Ordering::Release);
}
pub fn flush(&self) -> io::Result<()> {
// INVARIANT: We always create the ledger file with a size of LEDGER_FILE_SIZE, which never
// changes over time. We can be sure that the slice we take to write the ledger state will
// be exactly LEDGER_FILE_SIZE bytes.
let mut ledger_mmap = self.ledger_mmap.lock();
let ledger_region = &mut ledger_mmap[..];
debug_assert_eq!(ledger_region.len(), LEDGER_FILE_SIZE);
self.state.serialize_to(ledger_region);
ledger_mmap.flush()
}
pub fn load_or_create<P>(data_dir: P) -> io::Result<Ledger>
where
P: AsRef<Path>,
{
let ledger_path = data_dir.as_ref().join("buffer.db");
let ledger_handle = OpenOptions::new().read(true).write(true).create(true).open(&ledger_path)?;
let _ = ledger_handle.set_len(LEDGER_FILE_SIZE as u64)?;
let ledger_mmap = unsafe { MmapOptions::new().map_mut(&ledger_handle)? };
let mut ledger = Ledger {
data_dir: data_dir.as_ref().to_owned(),
ledger_mmap: Mutex::new(ledger_mmap),
state: LedgerState::default(),
};
let _ = ledger.load_inner()?;
Ok(ledger)
}
}
#[derive(Clone, Default)]
struct DataFileState {
data_file_records: u64,
data_file_size: u64,
}
impl DataFileState {
pub fn track_write(&mut self, bytes_written: u64) {
self.data_file_records += 1;
self.data_file_size += bytes_written;
}
pub fn can_write(&self, record_len: u64) -> bool {
// INVARIANT: The data file size can never be greater than DATA_FILE_RECORD_DATA_MAX_USABLE,
// and so we know that subtracting `current_size` from DATA_FILE_RECORD_DATA_MAX_USABLE will
// never underflow.
let record_data_max_usable = DATA_FILE_RECORD_DATA_MAX_USABLE as u64;
let current_size = self.data_file_size;
assert!(record_data_max_usable >= current_size);
// INVARIANT: The writer doesn't allow records that are bigger the maximum data file size,
// less the data file header size and record header size. These limits are far, far below
// the maximum size of a u64, so `bytes_to_write` + DATA_FILE_RECORD_HEADER_SIZE should
// never overflow a u64.
let free_space = record_data_max_usable - current_size;
free_space >= record_len + DATA_FILE_RECORD_HEADER_SIZE as u64
}
pub fn reset(&mut self) {
self.data_file_records = 0;
self.data_file_size = 0;
}
pub fn serialize_to(&self, dst: &mut [u8]) {
// CLARITY TODO: This is very ugly, and fragile due to field offsets. It'd be nice if we
// had a macro or something to make this a little more programmatic/repeatable/machine
// checkable. Given that we only have three structs which we serialize in this fashion,
// though, that could be overkill.
//
// PERFORMANCE TODO: This is a nice, safe variant of pushing the state into the data file,
// but I'm not sure if doing a pointer-level `memcpy` action would be meaningfully faster.
let data_file_records = self.data_file_records.to_be_bytes();
let data_file_size = self.data_file_size.to_be_bytes();
let mut src = Vec::new();
src.extend_from_slice(&data_file_records[..]);
src.extend_from_slice(&data_file_size[..]);
debug_assert_eq!(dst.len(), DATA_FILE_HEADER_SIZE);
debug_assert_eq!(src.len(), DATA_FILE_HEADER_SIZE);
dst.copy_from_slice(&src[..]);
}
pub fn deserialize_from(&mut self, src: &[u8]) {
// CLARITY TODO: This is very ugly, and fragile due to field offsets. It'd be nice if we
// had a macro or something to make this a little more programmatic/repeatable/machine
// checkable. Given that we only have three structs which we serialize in this fashion,
// though, that could be overkill.
debug_assert_eq!(src.len(), DATA_FILE_HEADER_SIZE);
self.data_file_records = src[..8].try_into()
.map(u64::from_be_bytes)
.expect("should have had 8 bytes");
self.data_file_size = src[8..16].try_into()
.map(u64::from_be_bytes)
.expect("should have had 8 bytes");
}
}
#[repr(C)]
struct RecordHeader {
// ID of this record.
record_id: u64,
// Length of this record in bytes.
record_length: u64,
}
impl RecordHeader {
pub fn serialize_to(&self, dst: &mut [u8]) {
// CLARITY TODO: This is very ugly, and fragile due to field offsets. It'd be nice if we
// had a macro or something to make this a little more programmatic/repeatable/machine
// checkable. Given that we only have three structs which we serialize in this fashion,
// though, that could be overkill.
//
// PERFORMANCE TODO: This is a nice, safe variant of pushing the header into the data file,
// but I'm not sure if doing a pointer-level `memcpy` action would be meaningfully faster.
let record_id = self.record_id.to_be_bytes();
let record_length = self.record_length.to_be_bytes();
let mut src = Vec::new();
src.extend_from_slice(&record_id[..]);
src.extend_from_slice(&record_length[..]);
debug_assert_eq!(dst.len(), DATA_FILE_RECORD_HEADER_SIZE);
debug_assert_eq!(src.len(), DATA_FILE_RECORD_HEADER_SIZE);
dst.copy_from_slice(&src[..]);
}
pub fn deserialize_from(&mut self, src: &[u8]) {
// CLARITY TODO: This is very ugly, and fragile due to field offsets. It'd be nice if we
// had a macro or something to make this a little more programmatic/repeatable/machine
// checkable. Given that we only have three structs which we serialize in this fashion,
// though, that could be overkill.
debug_assert_eq!(src.len(), DATA_FILE_RECORD_HEADER_SIZE);
self.record_id = src[..8].try_into()
.map(u64::from_be_bytes)
.expect("should have had 8 bytes");
self.record_length = src[8..16].try_into()
.map(u64::from_be_bytes)
.expect("should have had 8 bytes");
}
}
struct WriteState {
ledger: Arc<Ledger>,
data_file: Option<MmapMut>,
data_file_state: DataFileState,
}
impl From<Arc<Ledger>> for WriteState {
fn from(ledger: Arc<Ledger>) -> Self {
WriteState {
ledger,
data_file: None,
data_file_state: DataFileState::default(),
}
}
}
struct WriteGuard<'a> {
inner: MutexGuard<'a, WriteState>,
}
impl<'a> WriteGuard<'a> {
pub fn write<R>(&mut self, record: R) -> io::Result<()>
where
R: AsRef<[u8]>,
{
let buf = record.as_ref();
let buf_len = buf.len();
// Check that the record isn't bigger than the maximum record size. We'll do a more
// specific check related to whatever current data file we have open is, and what it has for
// remaining size, to ensure that we open a new data file if need be.
if buf_len > DATA_FILE_MAX_RECORD_SIZE {
return Err(io::Error::new(io::ErrorKind::Other, "record too large"))
}
// Ensure we have enough free space in the current file to write the record, or close the
// current file and open a new one.
//
// TODO: Add support for splitting a record across multiple files, since otherwise we'll
// end up losing space at the end when we're simply a few bytes short of being able to fit.
//
// We can technically already determine, based on the record length, whether or not the
// current data file could possibly contain the record data itself, although the hard part
// to figure out is if we rewrite the same record header in the next file, and whether or
// not we count a record that spans multiple data files as a record in both, in terms of
// `total_records`.
//
// We could likely do something like... if record length is N, and N + current data file
// offset is > max file size, then we know we're spilling over to another file, and we
// write a normal record block to the next data file, with the same record ID, but subtract
// what we've already written from record length for that 2nd, or 3rd, or nth record block.
// This way we know we're still reading the same record, and the record length always tells
// us how much is remaining, so we can keep carrying over and figure out if we need to read
// from _another_ data file.
//
// Where this potentially gets _very_ tricky is that if we already had the maximum number of
// files minus one, let's say, so we do our first partial write into the current (last
// possible, mind you) data file, and then need to spill over to another one... we'd
// technically be blocked waiting for the reader to read that very first data file and
// remove it, so we were able to recreate it.
//
// Technically, we _could_ do something like figuring out how many possible data files we
// could instantaneously allocate based on the reader's progress, and use that to figure out
// the maximum record we could possibly write at that moment, and block if we didn't have
// the capacity to do so. The math itself is simple because it's just a little extra
// arithmetic to consider the remaining data files that we can write, but it is extra logic
// that might become a little too thorny.
//
// The other option is that we simply keep things as-is, and don't allow writing a record
// bigger than a single data file and that we don't bother trying to spill over to a new
// data file, simply preferring to open a new data file. This isn't the most efficient use
// of space but it _is_ simpler.
//
// ASYNC TODO: This would be an asynchronous wait point to return from if we didn't have
// enough capacity for this write. Specifically, we'd be waiting here because the next file we
// want to write to already exists and is pending being read by the reader.
let _ = self.ensure_capacity(buf_len)?;
let record_id = self.inner.ledger.allocate_record_id();
// INVARIANT: The data file size is limited to a maximum of DATA_FILE_SIZE, which is far
// below even 2^32, so casting `data_file_size` to to u64 is safe and will not truncate.
// Additionally, adding DATA_FILE_HEADER_SIZE to it will not overflow a usize even on 32-bit
// platforms.
let write_offset = self.inner.data_file_state.data_file_size as usize + DATA_FILE_HEADER_SIZE;
let header_start = write_offset;
let header_end = header_start + DATA_FILE_RECORD_HEADER_SIZE;
let data_start = header_end;
let data_end = data_start + buf_len;
// Write our record header and data.
let data_file_mmap = self.inner.data_file.as_mut().expect("mmaped dat file should exist");
let record_header = RecordHeader { record_id, record_length: buf.len() as u64 };
let record_region = &mut data_file_mmap[header_start..header_end];
record_header.serialize_to(record_region);
let data_region = &mut data_file_mmap[data_start..data_end];
data_region.copy_from_slice(buf);
// Update the metadata now that we've written the record.
let bytes_written = (buf_len + DATA_FILE_RECORD_HEADER_SIZE) as u64;
self.inner.ledger.track_write(bytes_written);
self.inner.data_file_state.track_write(bytes_written);
Ok(())
}
fn ensure_capacity(&mut self, bytes_to_write: usize) -> io::Result<()> {
// If we have an open data file, check to see if it has enough room for us to write into.
// If not, we make sure it's flushed to disk, bump our writer file ID, and let the "no open
// file" logic below take over.
if self.inner.data_file.is_some() {
if self.inner.data_file_state.can_write(bytes_to_write as u64) {
return Ok(())
} else {
// Not enough space to write the record, so we need to close out the current
// data file and open a new one. Flush all of our current metadata and the
// data file itself before opening a new one.
let _ = self.flush()?;
self.inner.data_file = None;
self.inner.ledger.increment_writer_file_id();
}
}
// We currently have no open data file, or we didn't have enough space in the
// previously-opened one, either way, create a new one, and set it up.
//
// TODO: How do we detect (correctly) that, when the reader says it's reading the same file
// ID as we're about to create here, that we're actually just picking up where we left off
// with a not-yet-full data file _or_ that we've looped around with file IDs and are
// actually trying to write to a file that is full and not yet read yet?
//
// I suspect what it will come down to is reading the data file state initially and seeing
// if this file is brand new (0 records, 0 size) or not.
//
// ASYNC TODO: This would be an asynchronous wait point to return from if we didn't have
// enough capacity for this write. Specifically, we'd be waiting here because the file we
// want to write to already exists and is pending being read by the reader.
let data_file_path = self.inner.ledger.get_writer_current_data_file_path();
let data_file_handle = OpenOptions::new().read(true).write(true).create(true).open(&data_file_path)?;
let _ = data_file_handle.set_len(DATA_FILE_SIZE as u64)?;
let data_file_mmap = unsafe { MmapOptions::new().map_mut(&data_file_handle)? };
self.inner.data_file = Some(data_file_mmap);
self.inner.data_file_state.reset();
Ok(())
}
pub fn flush(&mut self) -> io::Result<()> {
// INVARIANT: We always created data files with a size of DATA_FILE_SIZE, which we assert to
// always be larger than DATA_FILE_HEADER_SIZE, so our `state_region` slice will always be
// valid and of DATA_FILE_HEADER_SIZE bytes long.
let state = self.inner.data_file_state.clone();
if let Some(data_file_mmap) = self.inner.data_file.as_mut() {
let state_region = &mut data_file_mmap[..DATA_FILE_HEADER_SIZE];
state.serialize_to(state_region);
let _ = data_file_mmap.flush()?;
}
self.inner.ledger.flush()
}
pub fn commit(mut self) -> io::Result<()> {
self.flush()
}
}
struct Writer {
ledger: Arc<Ledger>,
state: Mutex<WriteState>,
}
impl Writer {
pub fn new(ledger: Arc<Ledger>) -> Self {
let state = WriteState::from(Arc::clone(&ledger));
Writer {
ledger,
state: Mutex::new(state),
}
}
pub fn transaction(&self) -> WriteGuard<'_> {
WriteGuard {
inner: self.state.lock(),
}
}
}
struct Reader {
ledger: Arc<Ledger>,
data_file: Option<MmapMut>
}
struct Buffer;
impl Buffer {
pub fn new<P>(data_dir: P) -> io::Result<(Writer, Reader)>
where
P: AsRef<Path>,
{
// Establishing invariants about the size of various structures within the ledger and data
// files, to make life easier when implementing and ensuring operations are valid.
assert!(DATA_FILE_SIZE > DATA_FILE_HEADER_SIZE);
assert!(DATA_FILE_RECORD_DATA_MAX_USABLE < DATA_FILE_SIZE);
let ledger = Arc::new(Ledger::load_or_create(data_dir)?);
let writer = Writer::new(Arc::clone(&ledger));
let reader = Reader {
ledger,
data_file: None,
};
Ok((writer, reader))
}
}
fn main() {
let (writer, _reader) = Buffer::new("/tmp/mmap-testing")
.expect("failed to open buffer");
// will end up being around ~4.4kb
let mut payload = Vec::new();
for _ in 0..400 {
payload.extend_from_slice(&[b'h',b'e',b'l',b'l',b'o',b' ',b'w',b'o',b'r',b'l',b'd']);
}
for i in 0..20_000 {
let mut tx = writer.transaction();
tx.write(&payload[..]).expect("failed to write record");
tx.commit().expect("failed to commit transaction");
if i % 1000 == 0 {
println!("at txn {}", i);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment