Last active
February 12, 2022 06:01
-
-
Save yeaFern/e1058672aa6c14bbef57921bf5403de9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use polling::{Event, Poller}; | |
use std::io::prelude::*; | |
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream}; | |
use std::time::Duration; | |
/// Represents the state of a frame in which the length component is being decoded. | |
struct LengthState { | |
/// Current value of the length component. | |
value: u32, | |
/// Amount to shift value down when reading new bytes. | |
bit_offset: u32, | |
} | |
impl LengthState { | |
/// Constructs a new `LengthState`. | |
fn new() -> LengthState { | |
LengthState { | |
value: 0, | |
bit_offset: 0, | |
} | |
} | |
} | |
/// Represents the state of a frame in which the data component is being decoded. | |
struct DataState { | |
/// The number of bytes left to read from the stream. Bytes which occur after | |
/// this amount form the next frame. | |
amount_left: u32, | |
/// A buffer used to hold the incoming bytes. | |
buffer: Vec<u8>, | |
} | |
impl DataState { | |
/// Constructs a new `DataState` which will handle reading data of the given `length`. | |
fn new(length: u32) -> DataState { | |
DataState { | |
amount_left: length, | |
buffer: Vec::new(), | |
} | |
} | |
} | |
/// Represents the different states of the `FrameDecoder` state machine. | |
enum FrameState { | |
/// The state machine is currently decoding the length component. | |
ReadingLength(LengthState), | |
/// The state machine is currently decoding the data component. | |
ReadingData(DataState), | |
} | |
/// Represents the state of the state machine after decoding a single byte. | |
enum FrameDecodeResult { | |
/// The state machine finished decoding the length component. | |
ReadLength(u32), | |
/// The state machine finished decoding the data component. | |
ReadData(Vec<u8>), | |
/// The state machine is still decoding its current component. | |
Continuation, | |
} | |
/// A `FrameDecoder` is a state machine which it used to convert a stream of | |
/// bytes from a TCP connection into a logical frame or packet. | |
/// | |
/// The format of a frame is as follows: | |
/// - length: Minecraft-style variable length integer, encoded similar to the | |
/// LEB128 format. See https://wiki.vg/Protocol#VarInt_and_VarLong | |
/// - followed by 'length' bytes, which form the actual payload of the packet. | |
/// | |
/// The `FrameDecoder` is resposible for reading in bytes one at a time from | |
/// the TCP stream and reconstructing the frame. | |
struct FrameDecoder { | |
/// Current state of the `FrameDecoder`. | |
state: FrameState, | |
} | |
impl FrameDecoder { | |
/// Constructs a new `FrameDecoder`, in the default state. | |
fn new() -> FrameDecoder { | |
FrameDecoder { | |
state: FrameState::ReadingLength(LengthState::new()), | |
} | |
} | |
/// Decodes a single byte from a TCP stream. | |
/// | |
/// Returns a `FrameDecodeResult` which corresponds to the current state of | |
/// the `FrameDecoder`. | |
fn decode_byte(&mut self, b: u8) -> FrameDecodeResult { | |
match &mut self.state { | |
FrameState::ReadingLength(state) => { | |
if state.bit_offset >= 35 { | |
// Error, too big for a u32. | |
} | |
// See https://en.wikipedia.org/wiki/LEB128 | |
state.value |= ((b as u32) & 0b01111111) << state.bit_offset; | |
state.bit_offset += 7; | |
// MSB == 0 means we are done. | |
if (b & 0b10000000) == 0 { | |
let length = state.value; | |
// Transition into data state. | |
self.state = FrameState::ReadingData(DataState::new(length)); | |
return FrameDecodeResult::ReadLength(length); | |
} | |
} | |
FrameState::ReadingData(state) => { | |
state.buffer.push(b); | |
state.amount_left -= 1; | |
if state.amount_left == 0 { | |
// Copy the buffer. | |
// TODO: Can I somehow move this? | |
let buffer = state.buffer.to_owned(); | |
// Transition back into length state. | |
self.state = FrameState::ReadingLength(LengthState::new()); | |
return FrameDecodeResult::ReadData(buffer); | |
} | |
} | |
} | |
FrameDecodeResult::Continuation | |
} | |
} | |
/// Represents the result of a socket read operation. | |
enum ReadResult { | |
/// The read completed normally. | |
Success, | |
/// The connection was closed gracefully by the peer. | |
DroppedGraceful, | |
/// Some error occured which resulted in the connection being dropped. | |
DroppedForceful, | |
} | |
/// Represents some peer connection. | |
/// Contains data related to frame decoding etc. | |
struct Connection { | |
/// The remote peer. | |
stream: TcpStream, | |
/// The frame decoder used to decode packets. | |
frame_decoder: FrameDecoder, | |
} | |
impl Connection { | |
/// Constructs a new `Connection`. | |
fn new(stream: TcpStream) -> Connection { | |
Connection { | |
stream: stream, | |
frame_decoder: FrameDecoder::new(), | |
} | |
} | |
/// Reads bytes from the socket, should only be called when we know we can read | |
/// without blocking. | |
fn read(&mut self) -> ReadResult { | |
let mut buf = [0; 64]; | |
match self.stream.read(&mut buf) { | |
Ok(0) => ReadResult::DroppedGraceful, | |
Err(_) => ReadResult::DroppedForceful, | |
Ok(count) => { | |
for i in 0..count { | |
match self.frame_decoder.decode_byte(buf[i]) { | |
// If we finished decoding the data, do something with it. | |
FrameDecodeResult::ReadData(buffer) => { | |
println!("Received '{}'", String::from_utf8_lossy(&buffer)); | |
} | |
_ => {} | |
} | |
} | |
ReadResult::Success | |
} | |
} | |
} | |
} | |
/// Represents a TCP server and its connected peers. | |
struct Server { | |
/// The listening socket. | |
listener: TcpListener, | |
/// List of peers currently connected. | |
connections: Vec<Option<Connection>>, | |
/// Poller instance, used for even polling. | |
poller: Poller, | |
} | |
impl Server { | |
/// Creates a new `Server` listening on the given port. | |
fn create(port: u16) -> Server { | |
let listener = | |
TcpListener::bind(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)).unwrap(); | |
let poller = Poller::new().unwrap(); | |
listener.set_nonblocking(true).unwrap(); | |
// Register interest in readability events. | |
poller.add(&listener, Event::readable(0)).unwrap(); | |
return Server { | |
listener: listener, | |
connections: Vec::new(), | |
poller: poller, | |
}; | |
} | |
/// Handle an incoming connection, should only be called when we know we can accept | |
/// without blocking. | |
fn handle_accept(&mut self) { | |
let (incoming, addr) = self.listener.accept().unwrap(); | |
// Re-register interest in readability events. | |
self.poller | |
.modify(&self.listener, Event::readable(0)) | |
.unwrap(); | |
// Find first entry in vector which is None. | |
let pos = self.connections.iter_mut().position(|s| s.is_none()); | |
match pos { | |
Some(n) => { | |
// Register interest in all events from this socket. | |
self.poller.add(&incoming, Event::all(n + 1)).unwrap(); | |
// Assign incoming connection to retreived index if found. | |
self.connections[n] = Some(Connection::new(incoming)); | |
} | |
None => { | |
let n = self.connections.len(); | |
// Register interest in all events from this socket. | |
self.poller.add(&incoming, Event::all(n + 1)).unwrap(); | |
// If no empty spots in vector, grow vector. | |
self.connections.push(Some(Connection::new(incoming))); | |
} | |
} | |
println!("Connection established with {}", addr); | |
} | |
/// Handle a read request from the connection at the given index. | |
/// Should only be called when we know we can read without blocking. | |
fn handle_read(&mut self, n: usize) { | |
match &mut self.connections[n] { | |
Some(connection) => { | |
match connection.read() { | |
ReadResult::Success => { | |
// If we successfully read data, re-register interest in all | |
// events. | |
self.poller | |
.modify(&connection.stream, Event::all(n + 1)) | |
.unwrap(); | |
} | |
ReadResult::DroppedForceful => { | |
// Drop connection. | |
self.poller.delete(&connection.stream).unwrap(); | |
self.connections[n] = None; | |
println!("Connection dropped forcefully!"); | |
} | |
ReadResult::DroppedGraceful => { | |
// Drop connection. | |
self.poller.delete(&connection.stream).unwrap(); | |
self.connections[n] = None; | |
println!("Connection dropped gracefully!"); | |
} | |
} | |
} | |
None => {} | |
} | |
} | |
/// Handle a send request. | |
fn handle_send(&mut self, n: usize) { | |
match &mut self.connections[n] { | |
Some(connection) => { | |
// TODO: Get data from some internal buffer and 'send' it. | |
// Re-register interest in all events. | |
self.poller | |
.modify(&connection.stream, Event::all(n + 1)) | |
.unwrap(); | |
} | |
None => {} | |
} | |
} | |
/// Poll all sockets for events, handling them when needed. | |
fn poll(&mut self) { | |
let mut events = Vec::new(); | |
match self | |
.poller | |
.wait(&mut events, Some(Duration::from_micros(0))) | |
{ | |
Ok(_) => { | |
for event in events { | |
match event.key { | |
// Key zero corresponds to listener socket, so accept. | |
0 => self.handle_accept(), | |
// All other keys are peer sockets. | |
n => { | |
if event.readable { | |
self.handle_read(n - 1); | |
} | |
if event.writable { | |
self.handle_send(n - 1); | |
} | |
} | |
} | |
} | |
} | |
Err(_) => {} | |
} | |
} | |
} | |
fn main() { | |
let mut server = Server::create(37456); | |
println!("Server running on port 37456."); | |
// Game loop. | |
loop { | |
// Network poll. | |
server.poll(); | |
// Game world logic can go here. | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment