Skip to content

Instantly share code, notes, and snippets.

@yeaFern
Last active February 12, 2022 06:01
Show Gist options
  • Save yeaFern/e1058672aa6c14bbef57921bf5403de9 to your computer and use it in GitHub Desktop.
Save yeaFern/e1058672aa6c14bbef57921bf5403de9 to your computer and use it in GitHub Desktop.
use polling::{Event, Poller};
use std::io::prelude::*;
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream};
use std::time::Duration;
/// Represents the state of a frame in which the length component is being decoded.
struct LengthState {
/// Current value of the length component.
value: u32,
/// Amount to shift value down when reading new bytes.
bit_offset: u32,
}
impl LengthState {
/// Constructs a new `LengthState`.
fn new() -> LengthState {
LengthState {
value: 0,
bit_offset: 0,
}
}
}
/// Represents the state of a frame in which the data component is being decoded.
struct DataState {
/// The number of bytes left to read from the stream. Bytes which occur after
/// this amount form the next frame.
amount_left: u32,
/// A buffer used to hold the incoming bytes.
buffer: Vec<u8>,
}
impl DataState {
/// Constructs a new `DataState` which will handle reading data of the given `length`.
fn new(length: u32) -> DataState {
DataState {
amount_left: length,
buffer: Vec::new(),
}
}
}
/// Represents the different states of the `FrameDecoder` state machine.
enum FrameState {
/// The state machine is currently decoding the length component.
ReadingLength(LengthState),
/// The state machine is currently decoding the data component.
ReadingData(DataState),
}
/// Represents the state of the state machine after decoding a single byte.
enum FrameDecodeResult {
/// The state machine finished decoding the length component.
ReadLength(u32),
/// The state machine finished decoding the data component.
ReadData(Vec<u8>),
/// The state machine is still decoding its current component.
Continuation,
}
/// A `FrameDecoder` is a state machine which it used to convert a stream of
/// bytes from a TCP connection into a logical frame or packet.
///
/// The format of a frame is as follows:
/// - length: Minecraft-style variable length integer, encoded similar to the
/// LEB128 format. See https://wiki.vg/Protocol#VarInt_and_VarLong
/// - followed by 'length' bytes, which form the actual payload of the packet.
///
/// The `FrameDecoder` is resposible for reading in bytes one at a time from
/// the TCP stream and reconstructing the frame.
struct FrameDecoder {
/// Current state of the `FrameDecoder`.
state: FrameState,
}
impl FrameDecoder {
/// Constructs a new `FrameDecoder`, in the default state.
fn new() -> FrameDecoder {
FrameDecoder {
state: FrameState::ReadingLength(LengthState::new()),
}
}
/// Decodes a single byte from a TCP stream.
///
/// Returns a `FrameDecodeResult` which corresponds to the current state of
/// the `FrameDecoder`.
fn decode_byte(&mut self, b: u8) -> FrameDecodeResult {
match &mut self.state {
FrameState::ReadingLength(state) => {
if state.bit_offset >= 35 {
// Error, too big for a u32.
}
// See https://en.wikipedia.org/wiki/LEB128
state.value |= ((b as u32) & 0b01111111) << state.bit_offset;
state.bit_offset += 7;
// MSB == 0 means we are done.
if (b & 0b10000000) == 0 {
let length = state.value;
// Transition into data state.
self.state = FrameState::ReadingData(DataState::new(length));
return FrameDecodeResult::ReadLength(length);
}
}
FrameState::ReadingData(state) => {
state.buffer.push(b);
state.amount_left -= 1;
if state.amount_left == 0 {
// Copy the buffer.
// TODO: Can I somehow move this?
let buffer = state.buffer.to_owned();
// Transition back into length state.
self.state = FrameState::ReadingLength(LengthState::new());
return FrameDecodeResult::ReadData(buffer);
}
}
}
FrameDecodeResult::Continuation
}
}
/// Represents the result of a socket read operation.
enum ReadResult {
/// The read completed normally.
Success,
/// The connection was closed gracefully by the peer.
DroppedGraceful,
/// Some error occured which resulted in the connection being dropped.
DroppedForceful,
}
/// Represents some peer connection.
/// Contains data related to frame decoding etc.
struct Connection {
/// The remote peer.
stream: TcpStream,
/// The frame decoder used to decode packets.
frame_decoder: FrameDecoder,
}
impl Connection {
/// Constructs a new `Connection`.
fn new(stream: TcpStream) -> Connection {
Connection {
stream: stream,
frame_decoder: FrameDecoder::new(),
}
}
/// Reads bytes from the socket, should only be called when we know we can read
/// without blocking.
fn read(&mut self) -> ReadResult {
let mut buf = [0; 64];
match self.stream.read(&mut buf) {
Ok(0) => ReadResult::DroppedGraceful,
Err(_) => ReadResult::DroppedForceful,
Ok(count) => {
for i in 0..count {
match self.frame_decoder.decode_byte(buf[i]) {
// If we finished decoding the data, do something with it.
FrameDecodeResult::ReadData(buffer) => {
println!("Received '{}'", String::from_utf8_lossy(&buffer));
}
_ => {}
}
}
ReadResult::Success
}
}
}
}
/// Represents a TCP server and its connected peers.
struct Server {
/// The listening socket.
listener: TcpListener,
/// List of peers currently connected.
connections: Vec<Option<Connection>>,
/// Poller instance, used for even polling.
poller: Poller,
}
impl Server {
/// Creates a new `Server` listening on the given port.
fn create(port: u16) -> Server {
let listener =
TcpListener::bind(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)).unwrap();
let poller = Poller::new().unwrap();
listener.set_nonblocking(true).unwrap();
// Register interest in readability events.
poller.add(&listener, Event::readable(0)).unwrap();
return Server {
listener: listener,
connections: Vec::new(),
poller: poller,
};
}
/// Handle an incoming connection, should only be called when we know we can accept
/// without blocking.
fn handle_accept(&mut self) {
let (incoming, addr) = self.listener.accept().unwrap();
// Re-register interest in readability events.
self.poller
.modify(&self.listener, Event::readable(0))
.unwrap();
// Find first entry in vector which is None.
let pos = self.connections.iter_mut().position(|s| s.is_none());
match pos {
Some(n) => {
// Register interest in all events from this socket.
self.poller.add(&incoming, Event::all(n + 1)).unwrap();
// Assign incoming connection to retreived index if found.
self.connections[n] = Some(Connection::new(incoming));
}
None => {
let n = self.connections.len();
// Register interest in all events from this socket.
self.poller.add(&incoming, Event::all(n + 1)).unwrap();
// If no empty spots in vector, grow vector.
self.connections.push(Some(Connection::new(incoming)));
}
}
println!("Connection established with {}", addr);
}
/// Handle a read request from the connection at the given index.
/// Should only be called when we know we can read without blocking.
fn handle_read(&mut self, n: usize) {
match &mut self.connections[n] {
Some(connection) => {
match connection.read() {
ReadResult::Success => {
// If we successfully read data, re-register interest in all
// events.
self.poller
.modify(&connection.stream, Event::all(n + 1))
.unwrap();
}
ReadResult::DroppedForceful => {
// Drop connection.
self.poller.delete(&connection.stream).unwrap();
self.connections[n] = None;
println!("Connection dropped forcefully!");
}
ReadResult::DroppedGraceful => {
// Drop connection.
self.poller.delete(&connection.stream).unwrap();
self.connections[n] = None;
println!("Connection dropped gracefully!");
}
}
}
None => {}
}
}
/// Handle a send request.
fn handle_send(&mut self, n: usize) {
match &mut self.connections[n] {
Some(connection) => {
// TODO: Get data from some internal buffer and 'send' it.
// Re-register interest in all events.
self.poller
.modify(&connection.stream, Event::all(n + 1))
.unwrap();
}
None => {}
}
}
/// Poll all sockets for events, handling them when needed.
fn poll(&mut self) {
let mut events = Vec::new();
match self
.poller
.wait(&mut events, Some(Duration::from_micros(0)))
{
Ok(_) => {
for event in events {
match event.key {
// Key zero corresponds to listener socket, so accept.
0 => self.handle_accept(),
// All other keys are peer sockets.
n => {
if event.readable {
self.handle_read(n - 1);
}
if event.writable {
self.handle_send(n - 1);
}
}
}
}
}
Err(_) => {}
}
}
}
fn main() {
let mut server = Server::create(37456);
println!("Server running on port 37456.");
// Game loop.
loop {
// Network poll.
server.poll();
// Game world logic can go here.
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment