Last active
November 23, 2024 06:09
-
-
Save kawasin73/d0994eb048afb56bd8819966dc1329ea to your computer and use it in GitHub Desktop.
cmp_ringbuffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "cmp_ringbuffer" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
libc = "*" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::ffi::CString; | |
use std::fs::File; | |
use std::io::Read; | |
use std::ops::BitXorAssign; | |
use std::ptr::copy_nonoverlapping; | |
use std::sync::atomic::AtomicBool; | |
use std::sync::atomic::AtomicUsize; | |
use std::sync::atomic::Ordering; | |
use std::sync::Arc; | |
use std::sync::Barrier; | |
use std::time::Instant; | |
// 3MB | |
const DATA_LEN: usize = 3 * 1024 * 1024; | |
const N_DATA_PER_PRODUCER: usize = 1000; | |
const N_PRODUCERS: usize = 3; | |
const RB_SIZE: usize = N_PRODUCERS * 10; | |
fn main() { | |
let barrier = Arc::new(Barrier::new(N_PRODUCERS + 1)); | |
let rb = Arc::new(RingBuffer::new()); | |
for i in 0..N_PRODUCERS { | |
let barrier = barrier.clone(); | |
let rb = rb.clone(); | |
std::thread::spawn(move || { | |
println!("producer {i}"); | |
producer(rb, barrier); | |
println!("producer {i} finished"); | |
}); | |
} | |
barrier.wait(); | |
println!("start"); | |
let start = Instant::now(); | |
consumer(rb.clone()); | |
let elapsed = start.elapsed(); | |
println!("finished: {:?}", elapsed); | |
} | |
fn producer(rb: Arc<RingBuffer>, barrier: Arc<Barrier>) { | |
let mut src_data = vec![0; DATA_LEN]; | |
File::open("/dev/urandom") | |
.unwrap() | |
.read_exact(&mut src_data) | |
.unwrap(); | |
barrier.wait(); | |
for _ in 0..N_DATA_PER_PRODUCER { | |
rb.allocate_sem.wait(); | |
let idx = loop { | |
let before_idx = rb.head.load(Ordering::Relaxed); | |
let after_idx = (before_idx + 1) % rb.buffer.len(); | |
if rb | |
.head | |
.compare_exchange(before_idx, after_idx, Ordering::Acquire, Ordering::Relaxed) | |
.is_ok() | |
{ | |
break before_idx; | |
} | |
}; | |
let dst = &rb.buffer[idx]; | |
unsafe { | |
let dst_ptr = dst.data.as_ptr() as *mut u8; | |
copy_nonoverlapping(src_data.as_ptr(), dst_ptr, DATA_LEN); | |
} | |
dst.flag.store(true, Ordering::SeqCst); | |
rb.ready_sem.post(); | |
} | |
} | |
fn consumer(rb: Arc<RingBuffer>) { | |
for i in 0..(N_DATA_PER_PRODUCER * N_PRODUCERS) { | |
let idx = i % rb.buffer.len(); | |
let src = &rb.buffer[idx]; | |
loop { | |
if src.flag.load(Ordering::SeqCst) { | |
break; | |
} | |
rb.ready_sem.wait(); | |
} | |
let mut sum = 0; | |
for i in 0..DATA_LEN / 8 { | |
let v = u64::from_ne_bytes( | |
src.data[i * 8..(i + 1) * 8] | |
.try_into() | |
.expect("try_into failed"), | |
); | |
sum.bitxor_assign(v); | |
} | |
std::hint::black_box(sum); | |
src.flag.store(false, Ordering::SeqCst); | |
rb.allocate_sem.post(); | |
} | |
} | |
struct Data { | |
flag: AtomicBool, | |
data: [u8; DATA_LEN], | |
} | |
impl Default for Data { | |
fn default() -> Self { | |
Self { | |
flag: AtomicBool::new(false), | |
data: [0; DATA_LEN], | |
} | |
} | |
} | |
struct RingBuffer { | |
buffer: Vec<Data>, | |
head: AtomicUsize, | |
allocate_sem: Semaphore, | |
ready_sem: Semaphore, | |
} | |
impl RingBuffer { | |
fn new() -> Self { | |
let mut buffer = Vec::with_capacity(RB_SIZE); | |
for _ in 0..RB_SIZE { | |
buffer.push(Data::default()); | |
} | |
let allocate_sem = Semaphore::new("allocate_sem", 0); | |
for _ in 0..buffer.len() { | |
allocate_sem.post(); | |
} | |
let ready_sem = Semaphore::new("ready_sem", 0); | |
Self { | |
buffer, | |
head: AtomicUsize::new(0), | |
allocate_sem, | |
ready_sem, | |
} | |
} | |
} | |
struct Semaphore { | |
sem_name: CString, | |
sem: *mut libc::sem_t, | |
} | |
unsafe impl Send for Semaphore {} | |
unsafe impl Sync for Semaphore {} | |
impl Semaphore { | |
fn new(name: &str, value: u32) -> Self { | |
let sem_name = CString::new(name).expect("CString::new failed"); | |
let sem = unsafe { libc::sem_open(sem_name.as_ptr(), libc::O_CREAT, 0o644, value) }; | |
if sem == libc::SEM_FAILED { | |
println!("sem_open failed: {:?}", std::io::Error::last_os_error()); | |
panic!("Failed to open semaphore"); | |
} | |
Self { sem_name, sem } | |
} | |
fn wait(&self) { | |
unsafe { | |
libc::sem_wait(self.sem); | |
} | |
} | |
fn post(&self) { | |
unsafe { | |
libc::sem_post(self.sem); | |
} | |
} | |
} | |
impl Drop for Semaphore { | |
fn drop(&mut self) { | |
unsafe { | |
if libc::sem_close(self.sem) != 0 { | |
eprintln!("Failed to close semaphore"); | |
} | |
} | |
unsafe { | |
if libc::sem_unlink(self.sem_name.as_ptr()) != 0 { | |
eprintln!("Failed to unlink semaphore"); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment