Skip to content

Instantly share code, notes, and snippets.

@kawasin73
Last active November 23, 2024 06:09
Show Gist options
  • Save kawasin73/d0994eb048afb56bd8819966dc1329ea to your computer and use it in GitHub Desktop.
Save kawasin73/d0994eb048afb56bd8819966dc1329ea to your computer and use it in GitHub Desktop.
cmp_ringbuffer
[package]
name = "cmp_ringbuffer"
version = "0.1.0"
edition = "2021"
[dependencies]
libc = "*"
use std::ffi::CString;
use std::fs::File;
use std::io::Read;
use std::ops::BitXorAssign;
use std::ptr::copy_nonoverlapping;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Barrier;
use std::time::Instant;
// 3MB
const DATA_LEN: usize = 3 * 1024 * 1024;
const N_DATA_PER_PRODUCER: usize = 1000;
const N_PRODUCERS: usize = 3;
const RB_SIZE: usize = N_PRODUCERS * 10;
fn main() {
let barrier = Arc::new(Barrier::new(N_PRODUCERS + 1));
let rb = Arc::new(RingBuffer::new());
for i in 0..N_PRODUCERS {
let barrier = barrier.clone();
let rb = rb.clone();
std::thread::spawn(move || {
println!("producer {i}");
producer(rb, barrier);
println!("producer {i} finished");
});
}
barrier.wait();
println!("start");
let start = Instant::now();
consumer(rb.clone());
let elapsed = start.elapsed();
println!("finished: {:?}", elapsed);
}
fn producer(rb: Arc<RingBuffer>, barrier: Arc<Barrier>) {
let mut src_data = vec![0; DATA_LEN];
File::open("/dev/urandom")
.unwrap()
.read_exact(&mut src_data)
.unwrap();
barrier.wait();
for _ in 0..N_DATA_PER_PRODUCER {
rb.allocate_sem.wait();
let idx = loop {
let before_idx = rb.head.load(Ordering::Relaxed);
let after_idx = (before_idx + 1) % rb.buffer.len();
if rb
.head
.compare_exchange(before_idx, after_idx, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break before_idx;
}
};
let dst = &rb.buffer[idx];
unsafe {
let dst_ptr = dst.data.as_ptr() as *mut u8;
copy_nonoverlapping(src_data.as_ptr(), dst_ptr, DATA_LEN);
}
dst.flag.store(true, Ordering::SeqCst);
rb.ready_sem.post();
}
}
fn consumer(rb: Arc<RingBuffer>) {
for i in 0..(N_DATA_PER_PRODUCER * N_PRODUCERS) {
let idx = i % rb.buffer.len();
let src = &rb.buffer[idx];
loop {
if src.flag.load(Ordering::SeqCst) {
break;
}
rb.ready_sem.wait();
}
let mut sum = 0;
for i in 0..DATA_LEN / 8 {
let v = u64::from_ne_bytes(
src.data[i * 8..(i + 1) * 8]
.try_into()
.expect("try_into failed"),
);
sum.bitxor_assign(v);
}
std::hint::black_box(sum);
src.flag.store(false, Ordering::SeqCst);
rb.allocate_sem.post();
}
}
struct Data {
flag: AtomicBool,
data: [u8; DATA_LEN],
}
impl Default for Data {
fn default() -> Self {
Self {
flag: AtomicBool::new(false),
data: [0; DATA_LEN],
}
}
}
struct RingBuffer {
buffer: Vec<Data>,
head: AtomicUsize,
allocate_sem: Semaphore,
ready_sem: Semaphore,
}
impl RingBuffer {
fn new() -> Self {
let mut buffer = Vec::with_capacity(RB_SIZE);
for _ in 0..RB_SIZE {
buffer.push(Data::default());
}
let allocate_sem = Semaphore::new("allocate_sem", 0);
for _ in 0..buffer.len() {
allocate_sem.post();
}
let ready_sem = Semaphore::new("ready_sem", 0);
Self {
buffer,
head: AtomicUsize::new(0),
allocate_sem,
ready_sem,
}
}
}
struct Semaphore {
sem_name: CString,
sem: *mut libc::sem_t,
}
unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}
impl Semaphore {
fn new(name: &str, value: u32) -> Self {
let sem_name = CString::new(name).expect("CString::new failed");
let sem = unsafe { libc::sem_open(sem_name.as_ptr(), libc::O_CREAT, 0o644, value) };
if sem == libc::SEM_FAILED {
println!("sem_open failed: {:?}", std::io::Error::last_os_error());
panic!("Failed to open semaphore");
}
Self { sem_name, sem }
}
fn wait(&self) {
unsafe {
libc::sem_wait(self.sem);
}
}
fn post(&self) {
unsafe {
libc::sem_post(self.sem);
}
}
}
impl Drop for Semaphore {
fn drop(&mut self) {
unsafe {
if libc::sem_close(self.sem) != 0 {
eprintln!("Failed to close semaphore");
}
}
unsafe {
if libc::sem_unlink(self.sem_name.as_ptr()) != 0 {
eprintln!("Failed to unlink semaphore");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment