Skip to content

Instantly share code, notes, and snippets.

@jonathanstrong
Created May 6, 2021 19:05
Show Gist options
  • Save jonathanstrong/0d8aaef2fbbb62d2ddcd291f62a03114 to your computer and use it in GitHub Desktop.
Save jonathanstrong/0d8aaef2fbbb62d2ddcd291f62a03114 to your computer and use it in GitHub Desktop.
failed attempt at designing event loop around rio's interface
[package]
name = "rio-event-loop-attempt"
version = "0.1.0"
edition = "2018"
[dependencies]
rio = "0.9.4"
slab = "0.4.3"
crossbeam-channel = "0.5"
libc = "0.2.68"
use std::io::{self, prelude::*};
use std::collections::VecDeque;
use std::thread::{self, JoinHandle};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::path::PathBuf;
use std::time::*;
use std::fs::{self, File};
use std::os::unix::fs::OpenOptionsExt;
use slab::Slab;
use rio::Completion;
use crossbeam_channel::{Sender, Receiver, bounded};
const ALIGNMENT: usize = 512;
pub struct PendingWrite<'a> {
job_id: usize,
buffer_index: usize,
file_index: usize,
completions: Vec<Completion<'a, usize>>,
target: PathBuf,
callback: Receiver<Result<usize, String>>,
submitted: Instant,
}
pub struct Job {
job_id: usize,
target: PathBuf,
callback: Receiver<Result<usize, String>>,
}
pub fn listener(
rx: Receiver<Job>,
term: Arc<AtomicBool>,
) -> JoinHandle<()> {
thread::spawn(move || {
let ring = rio::new().unwrap();
let mut files: Slab<File> = Slab::with_capacity(32);
let mut buffers: Slab<Vec<u8>> = Slab::with_capacity(32);
let mut unused_buffers: Vec<Vec<u8>> = Vec::new();
let mut pending_writes: VecDeque<PendingWrite<'_>> = Default::default();
while ! term.load(Ordering::Relaxed) {
let loop_time = Instant::now();
match rx.try_recv() {
Ok(Job { job_id, target, callback }) => {
let mut buf: Vec<u8> = unused_buffers.pop().unwrap_or_else(|| Vec::new());
buf.clear();
buf.resize(ALIGNMENT, 0);
let now: u128 = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos();
(&mut buf[..]).write(&now.to_le_bytes()[..]).unwrap();
let buffer_index = buffers.insert(buf);
let file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.custom_flags(libc::O_DIRECT)
.open(&target)
.unwrap(); // in real program, handle this error
let file_index = files.insert(file);
let completions = vec![ ring.write_at(&files[file_index], &buffers[buffer_index], 0) ];
pending_writes.push_back(PendingWrite {
job_id,
buffer_index,
file_index,
completions,
target,
callback,
submitted: loop_time,
});
}
_ => {}
}
}
})
}
error[E0502]: cannot borrow `buffers` as mutable because it is also borrowed as immutable
--> src/lib.rs:54:40
|
54 | let buffer_index = buffers.insert(buf);
| ^^^^^^^^^^^^^^^^^^^ mutable borrow occurs here
...
65 | let completions = vec![ ring.write_at(&files[file_index], &buffers[buffer_index], 0) ];
| ------- immutable borrow occurs here
66 |
67 | pending_writes.push_back(PendingWrite {
| -------------- immutable borrow later used here
error[E0502]: cannot borrow `files` as mutable because it is also borrowed as immutable
--> src/lib.rs:63:38
|
63 | let file_index = files.insert(file);
| ^^^^^^^^^^^^^^^^^^ mutable borrow occurs here
64 |
65 | let completions = vec![ ring.write_at(&files[file_index], &buffers[buffer_index], 0) ];
| ----- immutable borrow occurs here
66 |
67 | pending_writes.push_back(PendingWrite {
| -------------- immutable borrow later used here
error: aborting due to 2 previous errors; 2 warnings emitted
For more information about this error, try `rustc --explain E0502`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment