Skip to content

Instantly share code, notes, and snippets.

@cfsamson
Last active September 15, 2019 21:20
Show Gist options
  • Save cfsamson/2b4e728c6ceaf8c7943b4968a33b26b1 to your computer and use it in GitHub Desktop.
Save cfsamson/2b4e728c6ceaf8c7943b4968a33b26b1 to your computer and use it in GitHub Desktop.
Futures Code Samples
use std::io;
fn main() {
let sys_message = String::from("Hello world from syscall!\n");
syscall(sys_message).unwrap();
}
// and: http://man7.org/linux/man-pages/man2/write.2.html
#[cfg(not(target_os = "windows"))]
#[link(name = "c")]
extern "C" {
fn write(fd: u32, buf: *const u8, count: usize) -> i32;
}
#[cfg(not(target_os = "windows"))]
fn syscall(message: String) -> io::Result<()> {
let msg_ptr = message.as_ptr();
let len = message.len();
let res = unsafe { write(1, msg_ptr, len) };
if res == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(target_os = "windows")]
#[link(name = "kernel32")]
extern "stdcall" {
/// https://docs.microsoft.com/en-us/windows/console/getstdhandle
fn GetStdHandle(nStdHandle: i32) -> i32;
/// https://docs.microsoft.com/en-us/windows/console/writeconsole
fn WriteConsoleW(
hConsoleOutput: i32,
lpBuffer: *const u16,
numberOfCharsToWrite: u32,
lpNumberOfCharsWritten: *mut u32,
lpReserved: *const std::ffi::c_void,
) -> i32;
}
#[cfg(target_os = "windows")]
fn syscall(message: String) -> io::Result<()> {
// let's convert our utf-8 to a format windows understands
let msg: Vec<u16> = message.encode_utf16().collect();
let msg_ptr = msg.as_ptr();
let len = msg.len() as u32;
let mut output: u32 = 0;
let handle = unsafe { GetStdHandle(-11) };
if handle == -1 {
return Err(io::Error::last_os_error())
}
let res = unsafe {
WriteConsoleW(handle, msg_ptr, len, &mut output, std::ptr::null())
};
if res == 0 {
return Err(io::Error::last_os_error());
}
assert_eq!(output, len);
Ok(())
}
// ===== EPOLL, KQUEUE =====
use std::io;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::os::unix::io::{IntoRawFd, AsRawFd};
use std::sync::mpsc::{channel, Receiver};
use std::thread;
mod minimio {
use std::ffi::c_void;
use std::io;
use std::os::unix::io::RawFd;
#[cfg(target_os = "macos")]
pub mod macos {
use super::*;
#[derive(Debug, Clone, Default)]
#[repr(C)]
// https://github.com/rust-lang/libc/blob/c8aa8ec72d631bc35099bcf5d634cf0a0b841be0/src/unix/bsd/apple/mod.rs#L497
// https://github.com/rust-lang/libc/blob/c8aa8ec72d631bc35099bcf5d634cf0a0b841be0/src/unix/bsd/apple/mod.rs#L207
pub struct Kevent {
pub ident: u64,
pub filter: i16,
pub flags: u16,
pub fflags: u32,
pub data: i64,
pub udata: u64,
pub ext: [u64; 2],
}
// https://github.com/rust-lang/libc/blob/c8aa8ec72d631bc35099bcf5d634cf0a0b841be0/src/unix/bsd/apple/mod.rs#L2447
pub const EV_ADD: u16 = 0x1;
pub const EVFILT_TIMER: i16 = -7;
pub const EV_ENABLE: u16 = 0x4;
pub const EV_ONESHOT: u16 = 0x10;
pub const EVFILT_READ: i16 = -1;
#[link(name = "c")]
extern {
/// Returns: positive: file descriptor, negative: error
fn kqueue() -> i32;
/// Returns: nothing, all non zero return values is an error
//fn kevent(epfd: i32, op: i32, fd: i32, epoll_event: *const Kevent) -> i32;
fn kevent(
kq: i32,
changelist: *const Kevent,
nchanges: i32,
eventlist: *mut Kevent,
nevents: i32,
timeout: usize,
) -> i32;
}
pub fn m_kqueue() -> io::Result<i32> {
let fd = unsafe {kqueue()};
if fd < 0 {
return Err(io::Error::last_os_error());
}
Ok(fd)
}
pub fn m_kevent(
kq: RawFd,
changelist: &[Kevent],
eventlist: &mut [Kevent],
timeout_ms: usize,
) -> io::Result<usize> {
let res = unsafe {
kevent(
kq as i32,
changelist.as_ptr(),
changelist.len() as i32,
eventlist.as_mut_ptr(),
eventlist.len() as i32,
timeout_ms,
)
};
if res < 0 {
return Err(io::Error::last_os_error());
}
Ok(res as usize)
}
}
#[cfg(target_os = "linux")]
mod linux {
#[link(name = "c")]
extern "C" {
static EPOLL_CTL_ADD: i32;
static EPOLLIN: i32;
static EPOLLOUT: i32;
static EPOLLET: i32;
/// Returns: positive: file descriptor, negative: error
fn epoll_create(size: i32) -> i32;
/// Returns: nothing, all non zero return values is an error
fn epoll_ctl(epfd: i32, op: i32, fd: i32, epoll_event: *const EpollEvent) -> i32;
/// Returns: positive: number of file descriptors ready for the requested I/O, -1: Error
/// epoll_events is a bitmask composed by OR'ing zero or more predefined event types
fn epoll_wait(
epfd: i32,
epoll_events: *const Event,
maxevents: i32,
timeout: i32,
) -> i32;
}
#[repr(C)]
struct EpollEvent {
events: i32,
}
}
}
use minimio::macos::*;
struct Http {
recv: Receiver<u64>,
}
#[cfg(target_os = "macos")]
impl Http {
fn get(timer: i64) -> io::Result<()> {
let mut stream: TcpStream = TcpStream::connect("slowwly.robertomurray.co.uk:80")
.expect("Couldn't connect to the server...");
// the `\` at the end of each line drops the newline character on line break and removes all the
// whitespace before the nest character. We need to manage the newlines ourselves but that's
// fine since the HTML protocol expect CRLF `\r\n` newlines and not just LF `\n`
let request =
"GET /delay/4000/url/http://www.google.com HTTP/1.1\r\n\
Host: slowwly.robertomurray.co.uk\r\n\
Connection: close\r\n\
\r\n";
stream.write(request.as_bytes());
stream.set_nonblocking(true).expect("set_nonblocking call failed");
;
// https://doc.rust-lang.org/std/os/unix/io/trait.IntoRawFd.html#tymethod.into_raw_fd
let fd = stream.as_raw_fd();
println!("{}", fd);
let event = Kevent {
ident: fd as u64,
filter: unsafe {EVFILT_READ},
flags: unsafe {EV_ADD | EV_ENABLE | EV_ONESHOT},
fflags: 0,
data: 0,
udata: 0,
ext: [0,0],
};
let mut changes = vec![event.clone()];
let kqueue = m_kqueue().expect("kqueue error.");
let kevent = m_kevent(kqueue, changes.as_slice(), &mut [], 0).expect("kevent error");
let (sender, recv) = channel::<u64>();
thread::spawn(move || {
let mut changes = vec![Kevent::default()];
loop {
println!("before {:?}", changes);
match m_kevent(kqueue, &[], changes.as_mut_slice(), 0) {
Ok(v) if v > 0 => {
println!("after {:?}", changes);
// changes is reset and populated with new events
for i in 0..v {
sender.send(changes.get(i).expect("event").ident).unwrap();
}
},
Err(e) => panic!("{:?}", e),
_ => (),
}
println!("Looped");
}
});
// thread::sleep(std::time::Duration::from_millis(2500));
// let mut clone = event.clone();
// clone.ident = 2;
// let changes = vec![clone];
// let event = m_kevent(kqueue, changes.as_slice(), &mut [], 0);
while let Ok(n) = recv.recv() {
println!("Event {} finished.", n);
let mut buff = String::new();
stream.read_to_string(&mut buff);
println!("{}", buff);
}
Ok(())
}
}
fn main() {
Http::get(2000);
}
fn main() {
let my_listener = || {
println!("I saw the event");
};
let mut eb: AsyncEventBus<Events> = AsyncEventBus::new();
eb.register(Events::FirstEvent, my_listener);
eb.emit(Events::FirstEvent);
eb.run();
}
#[derive(Hash, PartialEq, Eq)]
enum Events {
FirstEvent,
}
use std::hash::Hash;
use std::collections::HashMap;
use std::thread;
use std::sync::mpsc::{channel, Sender, Receiver};
type Callback = Fn() + Send + 'static;
enum Msg<E> {
Register(E, Box<Callback>),
Emit(E),
Finish,
}
struct AsyncEventBus<E>
where E: Hash {
sender: Sender<Msg<E>>,
reciever: Receiver<()>,
}
impl<E> AsyncEventBus<E>
where E: Hash + Send + PartialEq + Eq + 'static
{
fn new() -> Self {
let (sender2, recv2) = channel();
let (sender, recv) = channel();
thread::spawn(move || {
let mut bus = EventBus::<E>::new();
while let Ok(msg) = recv.recv() {
match msg {
Msg::Emit(event) => bus.emit(event),
Msg::Register(event, callback) => bus.register(event, callback),
Msg::Finish => sender2.send(()).unwrap(),
}
}
});
AsyncEventBus {
sender,
reciever: recv2,
}
}
fn register(&mut self, event: E, callback: impl Fn() + Send + 'static) {
self.sender.send(Msg::Register(event, Box::new(callback))).unwrap();
}
fn emit(&mut self, event: E) {
self.sender.send(Msg::Emit(event)).unwrap();
}
fn run(&mut self) {
self.sender.send(Msg::Finish).unwrap();
self.reciever.recv().unwrap();
println!("Finished");
}
}
struct EventBus<T>
where T: Hash
{
events: HashMap<T, Vec<Box<Fn() + 'static>>>
}
impl<T> EventBus<T>
where T: Hash + Eq
{
fn new() -> Self {
EventBus {
events: HashMap::new(),
}
}
fn register(&mut self, event: T, listener: impl Fn() + 'static)
where T: Hash + Eq
{
let listeners = self.events.entry(event).or_insert_with(|| vec![]);
listeners.push(Box::new(listener));
}
fn emit(&self, event: T) {
let maybe_listeners = self.events.get(&event);
if let Some(listeners) = maybe_listeners {
for listener in listeners {
(*listener)();
}
}
}
}
/// Think of this function as the javascript program you have written which is then
/// run by the runtime
fn javascript() {
println!("Thread: {}. I want to read test.txt", current());
Fs::read("test.txt", |result| {
// this is easier when dealing with javascript since you cast it to the relevant type
// and there are no more checks...
let text = result.to_string().unwrap();
let len = text.len();
println!("Thread: {}. First count: {} characters.", current(), len);
println!("Thread: {}. I want to encrypt something.", current());
Crypto::encrypt(text.len(), |result| {
let n = result.to_int().unwrap();
println!("Thread: {}. \"Encrypted\" number is: {}", current(), n);
})
});
// let's read the file again and display the text
println!(
"Thread: {}. I want to read test.txt a second time",
current()
);
Fs::read("test.txt", |result| {
let text = result.to_string().unwrap();
let len = text.len();
println!("Thread: {}. Second count: {} characters.", current(), len);
// aaand one more time but not in parallell.
println!(
"Thread: {}. I want to read test.txt a third time and then print the text",
current()
);
Fs::read("test.txt", |result| {
let text = result.to_string().unwrap();
println!(
"Thread: {}. The file contains the following text:\n\n\"{}\"\n",
current(),
text
);
});
});
}
fn current() -> String {
thread::current().name().unwrap().to_string()
}
fn main() {
let mut rt = Runtime::new();
rt.run(javascript);
}
// ===== THIS IS OUR "NODE LIBRARY" =====
use std::fmt;
use std::fs;
use std::io::Read;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};
static mut RUNTIME: usize = 0;
type Callback = Box<Fn(Js)>;
struct Event {
task: Box<Fn() -> Js + Send + 'static>,
callback_id: usize,
kind: EventKind,
}
enum EventKind {
FileRead,
Encrypt,
}
impl fmt::Display for EventKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use EventKind::*;
match self {
FileRead => write!(f, "File read"),
Encrypt => write!(f, "Encrypt"),
}
}
}
#[derive(Debug)]
enum Js {
Undefined,
String(String),
Int(usize),
}
impl Js {
fn to_string(self) -> Option<String> {
match self {
Js::String(s) => Some(s),
_ => None,
}
}
fn to_int(self) -> Option<usize> {
match self {
Js::Int(n) => Some(n),
_ => None,
}
}
}
#[derive(Debug)]
struct NodeThread {
handle: JoinHandle<()>,
sender: Sender<Event>,
}
struct Runtime {
thread_pool: Box<[NodeThread]>,
available: Vec<usize>,
callback_queue: Vec<Callback>,
refs: usize,
status_reciever: Receiver<(usize, usize, Js)>,
}
impl Runtime {
fn new() -> Self {
let (status_sender, status_reciever) = channel::<(usize, usize, Js)>();
let mut threads = Vec::with_capacity(4);
for i in 0..4 {
let (evt_sender, evt_reciever) = channel::<Event>();
let status_sender = status_sender.clone();
let handle = thread::Builder::new()
.name(i.to_string())
.spawn(move || {
while let Ok(event) = evt_reciever.recv() {
println!(
"Thread {}, recived a task of type: {}",
thread::current().name().unwrap(),
event.kind,
);
let res = (event.task)();
println!(
"Thread {}, finished running a task of type: {}.",
thread::current().name().unwrap(),
event.kind
);
status_sender.send((i, event.callback_id, res)).unwrap();
}
})
.expect("Couldn't initialize thread pool.");
let node_thread = NodeThread {
handle,
sender: evt_sender,
};
threads.push(node_thread);
}
Runtime {
thread_pool: threads.into_boxed_slice(),
available: (0..4).collect(),
callback_queue: Vec::new(),
refs: 0,
status_reciever,
}
}
// This is the event loop
fn run(&mut self, f: impl Fn()) {
let rt_ptr: *mut Runtime = self;
unsafe { RUNTIME = rt_ptr as usize };
// First we run our "main" function
f();
while self.refs > 0 {
// First poll any epoll/kqueue
// then check if there is any results from the threadpool
if let Ok((thread_id, callback_id, data)) = self.status_reciever.try_recv() {
let cb = &self.callback_queue[callback_id];
cb(data);
self.refs -= 1;
self.available.push(thread_id);
}
}
}
fn schedule(&mut self) -> usize {
match self.available.pop() {
Some(thread_id) => thread_id,
// We would normally queue this
None => panic!("Out of threads."),
}
}
fn register_work(
&mut self,
task: impl Fn() -> Js + Send + 'static,
kind: EventKind,
cb: impl Fn(Js) + 'static,
) {
self.callback_queue.push(Box::new(cb));
let event = Event {
task: Box::new(task),
callback_id: self.callback_queue.len() - 1,
kind,
};
// we are not going to implement a real scheduler here, just a LIFO queue
let available = self.schedule();
self.thread_pool[available].sender.send(event).unwrap();
self.refs += 1;
}
}
// ===== THIS IS PLUGINS CREATED IN C++ FOR THE NODE RUNTIME OR PART OF THE RUNTIME ITSELF =====
struct Crypto;
impl Crypto {
fn encrypt(n: usize, cb: impl Fn(Js) + 'static) {
let work = move || {
fn fibonacchi(n: usize) -> usize {
match n {
0 => 0,
1 => 1,
_ => fibonacchi(n - 1) + fibonacchi(n - 2),
}
}
let fib = fibonacchi(n);
Js::Int(fib)
};
let rt = unsafe { &mut *(RUNTIME as *mut Runtime) };
rt.register_work(work, EventKind::Encrypt, cb);
}
}
struct Fs;
impl Fs {
fn read(path: &'static str, cb: impl Fn(Js) + 'static) {
let work = move || {
// Let's simulate that there is a large file we're reading allowing us to actually
// observe how the code is executed
thread::sleep(std::time::Duration::from_secs(2));
let mut buffer = String::new();
fs::File::open(&path)
.unwrap()
.read_to_string(&mut buffer)
.unwrap();
Js::String(buffer)
};
let rt = unsafe { &mut *(RUNTIME as *mut Runtime) };
rt.register_work(work, EventKind::FileRead, cb);
}
}
fn main() {
let my_listener = || {
println!("I saw the event");
};
let mut eb: EventBus<Events> = EventBus::new();
eb.register(Events::FirstEvent, my_listener);
eb.emit(Events::FirstEvent);
}
#[derive(Hash, PartialEq, Eq)]
enum Events {
FirstEvent,
}
use std::hash::Hash;
use std::collections::HashMap;
struct EventBus<'a, T>
where T: Hash
{
events: HashMap<T, Vec<Box<'a + Fn()>>>
}
impl<'a, T> EventBus<'a, T>
where T: Hash + Eq
{
fn new() -> Self {
let eb = EventBus {
events: HashMap::new(),
};
}
fn register(&mut self, event: T, listener: impl Fn() + 'a)
where T: Hash + Eq
{
let listeners = self.events.entry(event).or_insert(vec![]);
listeners.push(Box::new(listener));
}
fn emit(&self, event: T) {
let maybe_listeners = self.events.get(&event);
if let Some(listeners) = maybe_listeners {
for listener in listeners {
(*listener)();
}
}
}
}
#![feature(asm)]
#![feature(naked_functions)]
fn main() {
//println!("Hello, world!");
let message = String::from("hello world!\n");
//let bytes = message.bytes().len();
//let message: [u8; 12] = [72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 33];
//let ptr: *const u8 = &message as *const u8;
syscall(message);
}
#[naked]
#[inline(never)]
fn syscall(message: String) {
let msg_ptr = message.as_ptr();
let len = message.len();
unsafe {
asm!(
"
# write(1, message, 13)
mov $$0x2000004, %rax # system call 1 is write
mov $$1, %rdi # file handle 1 is stdout
mov $0, %rsi # address of string to output
mov $1, %rdx # number of bytes
syscall
# if we use naked we need this, if not just remove
mov $$0x2000001, %rax
mov $$0, %rdi
syscall
"
:
: "r"(msg_ptr), "r"(len)
: "rax", "rdi", "rsi", "rdx", "memory"
)
}
}
#![feature(asm)]
fn main() {
//println!("Hello, world!");
let message = String::from("Hello world from asm!\n");
//let bytes = message.bytes().len();
//let message: [u8; 12] = [72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 33];
//let ptr: *const u8 = &message as *const u8;
syscall(message);
let sys_message = String::from("Hello world from libc!\n");
syscall_libc(sys_message);
}
// see: https://stackoverflow.com/a/30538561/1207060
#[cfg(target_os = "macos")]
fn syscall(message: String) {
let msg_ptr = message.as_ptr();
let len = message.len();
unsafe {
asm!(
"
# write(1, message, 13)
mov $$0x2000004, %rax # system call 0x2000004 is write on macos
mov $$1, %rdi # file handle 1 is stdout
mov $0, %rsi # address of string to output
mov $1, %rdx # number of bytes
syscall # call kernel, syscall interrupt
"
:
: "r"(msg_ptr), "r"(len)
: "rax", "rdi", "rsi", "rdx", "r11", "memory"
)
};
}
// see: http://man7.org/linux/man-pages/dir_section_2.html
// and: http://man7.org/linux/man-pages/man2/write.2.html
#[cfg(not(target_os = "windows"))]
#[link(name = "c")]
extern "C" {
fn write(fd: u32, buf: *const u8, count: usize);
}
// we don't need conditional compilation here
#[cfg(not(target_os = "windows"))]
fn syscall_libc(message: String) {
let msg_ptr = message.as_ptr();
let len = message.len();
unsafe { write(1, msg_ptr, len) };
}
#[cfg(target_os = "linux")]
fn syscall(message: String) {
let msg_ptr = message.as_ptr();
let len = message.len();
unsafe {
asm!("
# write(1, message, 13)
mov $$1, %rax # system call 1 is write on linux
mov $$1, %rdi # file handle 1 is stdout
mov $0, %rsi # address of string to output
mov $1, %rdx # number of bytes
syscall # call kernel, syscall interrupt
"
:
: "r"(msg_ptr), "r"(len)
: "rax", "rdi", "rsi", "rdx", "r11", "memory"
)
}
}
#[link(name = "user32")]
#[cfg(target_os = "windows")]
extern "stdcall" {
fn GetStdHandle(nStdHandle: u32) -> usize;
/// lpReserved is supposed to be NULL
fn WriteConsole(
hConsoleOutput: usize,
lpBuffer: *const u8,
numberOfCharsToWrite: u32,
lpNumberOfCharsWritten: *mut u32,
lpReserved: std::ffi::c_void::__variant1,
);
}
#[cfg(target_os = "windows")]
fn syscall(message: String) {
let msg_ptr = message.as_ptr();
let len = message.len();
let output: *mut u32 = &0;
unsafe {
let handle = GetStdHandle(11);
WriteConsole(handle, msg_ptr, len as u32, output, std::ptr::null());
}
assert_eq!(output, len);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment