Skip to content

Instantly share code, notes, and snippets.

@yoshuawuyts
Forked from Noah-Kennedy/kqueue.rs
Created August 28, 2024 01:28
Show Gist options
  • Save yoshuawuyts/c74b0b344f62133664f36d8192367b97 to your computer and use it in GitHub Desktop.
Save yoshuawuyts/c74b0b344f62133664f36d8192367b97 to your computer and use it in GitHub Desktop.
kqueue example
#![cfg(target_os = "macos")]
use rustix::event::kqueue;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::os::fd::AsRawFd;
use std::time::Duration;
fn main() {
// kickoff a simple echo server we can hit for demo purposes
std::thread::spawn(run_echo_server);
// create the kqueue instance
let kqueue_fd = kqueue::kqueue().unwrap();
// set up the client
let mut client = TcpStream::connect("127.0.0.1:8080").unwrap();
client.set_nonblocking(true).unwrap();
// we have not written anything yet, this should get EWOULDBLOCK
assert_eq!(
std::io::ErrorKind::WouldBlock,
client.read(&mut [0; 32]).unwrap_err().kind()
);
// send some data over the wire, and wait 1 second for the server to hopefully echo it back
client.write(b"hello, world!").unwrap();
std::thread::sleep(Duration::from_secs(1));
let mut events = Vec::with_capacity(1);
// Register the client for interest in read events, and don't wait for events to come in.
//
// Safety: we won't polling this after the TcpStream referred to closes, and we delete the
// event too.
//
// Though the rustix docs say that the kqueue must be closed first, this isn't technically true.
// You could delete the event as well, and failing to do so isn't actually catastrophic - the
// worst case is more spurious wakes.
let mut n_events = unsafe {
kqueue::kevent(
&kqueue_fd,
&[kqueue::Event::new(
// add a notification for read-readiness
kqueue::EventFilter::Read(client.as_raw_fd()),
// add a level-triggered event
//
// add EV_CLEAR too to make it edge-triggered, which is generally what you want in
// practice, but that is a discussion for another time
kqueue::EventFlags::ADD,
// 7 seems like a nice number to assert we get back!
7,
)],
&mut events,
// pass in no timeout, and wait indefinitely for events
None,
)
.unwrap()
};
// we loop due to spurious events being a possibility, polling may need to be retried
loop {
if n_events == 1 {
// verify that the event has the user data we specified, this is just to show udata in
// action
assert_eq!(7, events[0].udata());
let mut buffer = [0; 32];
match client.read(&mut buffer) {
Ok(n) => {
assert_eq!(b"hello, world!", &buffer[..n]);
break;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
n_events =
// safety: we are not modifying the list, just polling
unsafe { kqueue::kevent(&kqueue_fd, &[], &mut events, None).unwrap() };
}
Err(e) => {
panic!("Unexpected error {e:?}");
}
}
}
}
// cleanup by removing the event watch
unsafe {
kqueue::kevent(
&kqueue_fd,
&[kqueue::Event::new(
kqueue::EventFilter::Read(client.as_raw_fd()),
// remove the event
kqueue::EventFlags::DELETE,
7,
)],
// we are not waiting on events this time, no need to pass in a real buffer
&mut Vec::new(),
// dont block
Some(Duration::ZERO),
)
.unwrap()
};
}
fn run_echo_server() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
loop {
let conn = listener.accept().unwrap();
std::thread::spawn(move || handle_new_echo_server_connection(conn.0));
}
}
fn handle_new_echo_server_connection(conn: TcpStream) {
std::io::copy(&mut &conn, &mut &conn).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment