Created
May 5, 2020 01:05
-
-
Save Sherlock-Holo/6877d3f9e2271fb290326b4f55b13ec8 to your computer and use it in GitHub Desktop.
io_uring test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::fs::File; | |
use std::future::Future; | |
use std::io::Error; | |
use std::io::IoSliceMut; | |
use std::io::Result; | |
use std::marker::PhantomData; | |
use std::os::unix::io::AsRawFd; | |
use std::os::unix::io::RawFd; | |
use std::pin::Pin; | |
use std::sync::{Arc, Mutex, Once}; | |
use std::sync::atomic::{AtomicU64, Ordering}; | |
use std::task::{Context, Poll}; | |
use std::task::Waker; | |
use std::thread; | |
use anyhow::Context as _; | |
use io_uring::concurrent::{CompletionQueue, IoUring, SubmissionQueue}; | |
use io_uring::opcode::{AsyncCancel, Read}; | |
use io_uring::opcode::types::Target; | |
use once_cell::sync::OnceCell; | |
fn get_user_data() -> u64 { | |
static USER_DATA_GEN: AtomicU64 = AtomicU64::new(0); | |
USER_DATA_GEN.fetch_add(1, Ordering::Relaxed) | |
} | |
fn get_ring() -> &'static IoUring { | |
static RING: OnceCell<IoUring> = OnceCell::new(); | |
RING.get_or_init(|| { | |
io_uring::IoUring::new(4096).unwrap().concurrent() | |
}) | |
} | |
fn get_sq() -> SubmissionQueue<'static> { | |
get_ring().submission() | |
} | |
fn get_cq() -> CompletionQueue<'static> { | |
get_ring().completion() | |
} | |
fn get_result_map() -> &'static Mutex<HashMap<u64, IOUringResult>> { | |
static MAP: OnceCell<Mutex<HashMap<u64, IOUringResult>>> = OnceCell::new(); | |
MAP.get_or_init(|| { | |
Mutex::new(HashMap::new()) | |
}) | |
} | |
fn run_ring() { | |
let ring = get_ring(); | |
let completion_queue = get_cq(); | |
loop { | |
let size = ring.submit_and_wait(1).context("submit and wait failed").unwrap(); | |
let mut map_guard = get_result_map().lock().unwrap(); | |
while let Some(entry) = completion_queue.pop() { | |
let user_data = entry.user_data(); | |
let result = if entry.result() >= 0 { | |
Ok(entry.result() as usize) | |
} else { | |
Err(Error::last_os_error()) | |
}; | |
if let Some(share_result) = map_guard.remove(&user_data) { | |
let mut guard = share_result.lock().unwrap(); | |
guard.1.replace(result); | |
guard.0.take().unwrap().wake(); | |
} | |
} | |
} | |
} | |
type IOUringResult = Arc<Mutex<(Option<Waker>, Option<Result<usize>>)>>; | |
struct ReadFuture<'a> { | |
fd: RawFd, | |
_marker: PhantomData<&'a ()>, | |
buf: IoSliceMut<'a>, | |
user_data: u64, | |
is_submit: bool, | |
result: IOUringResult, | |
} | |
impl<'a> ReadFuture<'a> { | |
fn new(file: &'a File, buf: &'a mut [u8]) -> Self { | |
static RING_THREAD: Once = Once::new(); | |
RING_THREAD.call_once(|| { | |
thread::spawn(run_ring); | |
}); | |
let user_data = get_user_data(); | |
let result = Arc::new(Mutex::new((None, None))); | |
Self { | |
fd: file.as_raw_fd(), | |
_marker: Default::default(), | |
buf: IoSliceMut::new(buf), | |
user_data, | |
is_submit: false, | |
result, | |
} | |
} | |
} | |
impl Drop for ReadFuture<'_> { | |
fn drop(&mut self) { | |
if !self.is_submit { | |
return; | |
} | |
// means ring has wakeup this future | |
if self.result.lock().unwrap().0.is_none() { | |
return; | |
} | |
let mut cancel_entry = AsyncCancel::new(self.user_data).build(); | |
let submission_queue = get_sq(); | |
unsafe { | |
while let Err(entry) = submission_queue.push(cancel_entry) { | |
cancel_entry = entry; | |
} | |
} | |
} | |
} | |
impl Future for ReadFuture<'_> { | |
type Output = Result<usize>; | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
if self.is_submit { | |
if let Ok(mut guard) = self.result.try_lock() { | |
if let Some(result) = guard.1.take() { | |
return Poll::Ready(result); | |
} | |
} | |
return Poll::Pending; | |
} | |
let entry = Read::new(Target::Fd(self.fd), self.buf.as_mut_ptr(), self.buf.len() as _) | |
.build() | |
.user_data(self.user_data); | |
let submission_queue = get_sq(); | |
unsafe { | |
if submission_queue.push(entry).is_err() { | |
return Poll::Pending; | |
} | |
} | |
// Safety: event haven't register | |
self.result.try_lock().unwrap().0.replace(cx.waker().clone()); | |
get_result_map().lock().unwrap().insert(self.user_data, self.result.clone()); | |
self.is_submit = true; | |
Poll::Pending | |
} | |
} | |
trait UringRead { | |
fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a>; | |
} | |
impl UringRead for File { | |
fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a> { | |
ReadFuture::new(self, buf) | |
} | |
} | |
#[async_std::main] | |
async fn main() { | |
let file = File::open("/tmp/test.txt").unwrap(); | |
let mut buf = vec![0; 1024]; | |
let n = file.read(&mut buf).await.unwrap(); | |
println!("{}", String::from_utf8_lossy(&buf[..n])); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment