Created
March 23, 2023 10:12
-
-
Save iemelyanov/a4b7caeae6ce0c69b35d577870466d9f to your computer and use it in GitHub Desktop.
Rust async server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use libc::{EPOLLIN, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD}; | |
use std::collections::HashMap; | |
use std::io; | |
use std::io::ErrorKind; | |
use std::net::TcpListener; | |
use std::os::unix::io::AsRawFd; | |
use std::ptr; | |
macro_rules! syscall { | |
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ | |
let res = unsafe { libc::$fn($($arg, )*) }; | |
if res == -1 { | |
Err(io::Error::last_os_error()) | |
} else { | |
Ok(res) | |
} | |
}}; | |
} | |
const MAX_EVENTS: i32 = 128; | |
const RECV_BUF_CAP: usize = 1024; | |
type CallbackFn = fn(selector: &mut Selector, fd: i32) -> io::Result<()>; | |
struct Selector { | |
fd: i32, | |
callbacks: HashMap<i32, CallbackFn>, | |
} | |
impl Selector { | |
fn create() -> io::Result<(Self)> { | |
let fd = syscall!(epoll_create1(0))?; | |
let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EVENTS as usize); | |
for _ in 0..MAX_EVENTS { | |
events.push(libc::epoll_event { events: 0, u64: 0 }); | |
} | |
let new_loop = Selector { | |
fd: fd, | |
callbacks: HashMap::default(), | |
}; | |
Ok(new_loop) | |
} | |
fn register(&mut self, event_fd: i32, callback: CallbackFn, intereset: i32) -> io::Result<()> { | |
let mut event = libc::epoll_event { | |
events: intereset as u32, | |
u64: event_fd as u64, | |
}; | |
syscall!(epoll_ctl(self.fd, EPOLL_CTL_ADD, event_fd, &mut event))?; | |
self.callbacks.insert(event_fd, callback); | |
Ok(()) | |
} | |
fn unregister(&mut self, event_fd: i32) -> io::Result<()> { | |
if self.callbacks.contains_key(&event_fd) { | |
let mut event = libc::epoll_event { | |
events: 0, | |
u64: event_fd as u64, | |
}; | |
syscall!(epoll_ctl(self.fd, EPOLL_CTL_DEL, event_fd, &mut event))?; | |
self.callbacks.remove(&event_fd); | |
} | |
Ok(()) | |
} | |
fn modify(&mut self, event_fd: i32, callback: CallbackFn, intereset: i32) -> io::Result<()> { | |
let mut event = libc::epoll_event { | |
events: intereset as u32, | |
u64: event_fd as u64, | |
}; | |
syscall!(epoll_ctl(self.fd, EPOLL_CTL_MOD, event_fd, &mut event))?; | |
self.callbacks.insert(event_fd, callback); | |
Ok(()) | |
} | |
fn select(&mut self, events: &mut [libc::epoll_event]) -> io::Result<usize> { | |
let num_fds = syscall!(epoll_wait(self.fd, events.as_mut_ptr(), MAX_EVENTS, -1))?; | |
Ok(num_fds as usize) | |
} | |
fn callbacks(&self, event: &libc::epoll_event) -> Option<&CallbackFn> { | |
self.callbacks.get(&(event.u64 as i32)) | |
} | |
} | |
fn write(selector: &mut Selector, fd: i32) -> io::Result<()> { | |
let resp = "HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: 13\r\n\r\nHello World\r\n"; | |
syscall!(send(fd, resp.as_ptr() as *mut libc::c_void, resp.len(), 0))?; | |
selector.modify(fd, read, libc::EPOLLIN)?; | |
Ok(()) | |
} | |
fn read(selector: &mut Selector, fd: i32) -> io::Result<()> { | |
let mut buf = [0 as u8; RECV_BUF_CAP]; | |
let mut buf_size = 0; | |
loop { | |
let buf_ptr = &mut buf[buf_size] as *mut u8 as *mut libc::c_void; | |
match syscall!(recv(fd, buf_ptr, RECV_BUF_CAP - buf_size, 0)) { | |
Ok(n) => { | |
if n == 0 { | |
selector.unregister(fd)?; | |
syscall!(close(fd))?; | |
return Ok(()); | |
} | |
buf_size += n as usize; | |
} | |
Err(e) => match e.kind() { | |
ErrorKind::ConnectionReset => { | |
selector.unregister(fd)?; | |
syscall!(close(fd))?; | |
return Ok(()); | |
} | |
ErrorKind::WouldBlock => { | |
selector.modify(fd, write, libc::EPOLLOUT)?; | |
return Ok(()); | |
} | |
_ => { | |
syscall!(close(fd))?; | |
return Err(e); | |
} | |
}, | |
} | |
} | |
} | |
fn accept(selector: &mut Selector, fd: i32) -> io::Result<()> { | |
let conn_socket_fd = syscall!(accept(fd, ptr::null_mut(), ptr::null_mut()))?; | |
let flags = syscall!(fcntl(conn_socket_fd, libc::F_GETFL, 0))?; | |
syscall!(fcntl( | |
conn_socket_fd, | |
libc::F_SETFL, | |
flags | libc::O_NONBLOCK | |
))?; | |
selector.register(conn_socket_fd, read, EPOLL_CTL_ADD)?; | |
Ok(()) | |
} | |
fn main() -> io::Result<()> { | |
println!("Start server on 127.0.0.1:8080"); | |
let listener = TcpListener::bind("127.0.0.1:8080")?; | |
listener.set_nonblocking(true)?; | |
let socket_fd = listener.as_raw_fd(); | |
let mut selector = Selector::create()?; | |
selector.register(socket_fd, accept, EPOLLIN)?; | |
let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EVENTS as usize); | |
for _ in 0..MAX_EVENTS as usize { | |
events.push(libc::epoll_event { events: 0, u64: 0 }); | |
} | |
loop { | |
let num_events = selector.select(&mut events)?; | |
for i in 0..num_events { | |
if let Some(event) = events.get(i) { | |
if let Some(callback) = selector.callbacks(event) { | |
callback(&mut selector, event.u64 as i32)?; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment