Last active
November 28, 2023 23:32
-
-
Save timClicks/a49a520a4b6970e964f9c8b038f6534a to your computer and use it in GitHub Desktop.
async chat server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From Comprehensive Rust |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures_util::stream::StreamExt; | |
use futures_util::SinkExt; | |
use http::Uri; | |
use tokio::io::{AsyncBufReadExt, BufReader}; | |
use tokio_websockets::{ClientBuilder, Message}; | |
#[tokio::main] | |
async fn main() -> Result<(), tokio_websockets::Error> { | |
let server = Uri::from_static("ws://127.0.0.1:2000"); | |
let (mut ws_stream, _) = | |
ClientBuilder::from_uri(server) | |
.connect() | |
.await?; | |
let stdin = tokio::io::stdin(); | |
let mut stdin = BufReader::new(stdin).lines(); | |
todo!(); | |
Ok(()) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::error::Error; | |
use std::net::SocketAddr; | |
use futures_util::sink::SinkExt; | |
use futures_util::stream::StreamExt; | |
use tokio::net::{TcpListener, TcpStream}; | |
use tokio::sync::broadcast::{channel, Sender}; | |
use tokio_websockets::{Message, ServerBuilder, WebsocketStream}; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { | |
let (bcast_tx, _) = channel(16); | |
let listener = TcpListener::bind("127.0.0.1:2000").await?; | |
println!("listening on port 2000"); | |
loop { | |
let (socket, addr) = listener.accept().await?; | |
println!("New connection from {addr:?}"); | |
let bcast_tx = bcast_tx.clone(); | |
tokio::spawn(async move { | |
// Wrap the raw TCP stream into a websocket. | |
let ws_stream = ServerBuilder::new().accept(socket).await?; | |
handle_connection(addr, ws_stream, bcast_tx).await | |
}); | |
} | |
} | |
async fn handle_connection( | |
addr: SocketAddr, | |
mut ws_stream: WebsocketStream<TcpStream>, | |
bcast_tx: Sender<String>, | |
) -> Result<(), Box<dyn Error + Send + Sync>> { | |
// for every incoming message, broadcast it to everyone | |
todo!() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "chat" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
futures-util = { version = "0.3.28", features = ["sink"] } | |
http = "0.2.9" | |
tokio = { version = "1.28.1", features = ["full"] } | |
tokio-websockets = { version = "0.4.0", features = ["client", "fastrand", "server", "sha1_smol"] } | |
[[bin]] | |
name = "chat" | |
path = "src/client.rs" | |
[[bin]] | |
name = "chatserv" | |
path = "src/server.rs" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures_util::stream::StreamExt; | |
use futures_util::SinkExt; | |
use http::Uri; | |
use tokio::io::{AsyncBufReadExt, BufReader}; | |
use tokio_websockets::{ClientBuilder, Message}; | |
#[tokio::main] | |
async fn main() -> Result<(), tokio_websockets::Error> { | |
let server = Uri::from_static("ws://127.0.0.1:2000"); | |
let (mut ws_stream, _) = | |
ClientBuilder::from_uri(server) | |
.connect() | |
.await?; | |
let stdin = tokio::io::stdin(); | |
let mut stdin= BufReader::new(stdin).lines(); | |
if let Some(Ok(payload)) = ws_stream.next().await { | |
if let Some(welcome_message) = payload.as_text() { | |
println!("{welcome_message}"); | |
} | |
} | |
loop { | |
tokio::select! { | |
incoming = ws_stream.next() => { | |
match incoming { | |
Some(Ok(payload)) => { | |
if let Some(text) = payload.as_text() { | |
println!("< {text}"); | |
} | |
}, | |
Some(Err(err)) => return Err(err), | |
None => { | |
eprintln!("connection with server terminated"); | |
return Ok(()); | |
}, | |
} | |
} | |
input = stdin.next_line() => { | |
match input { | |
Ok(None) => return Ok(()), | |
Ok(Some(line)) => { | |
let msg = Message::text(line.to_string()); | |
ws_stream.send(msg).await?; | |
println!("> {line}"); | |
}, | |
Err(err) => return Err(err.into()), | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::error::Error; | |
use std::net::SocketAddr; | |
use futures_util::sink::SinkExt; | |
use futures_util::stream::StreamExt; | |
use tokio::net::{TcpListener, TcpStream}; | |
use tokio::sync::broadcast::{channel, Sender}; | |
use tokio_websockets::{Message, ServerBuilder, WebsocketStream}; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { | |
let (bcast_tx, _) = channel(16); | |
let listener = TcpListener::bind("127.0.0.1:2000").await?; | |
println!("listening on port 2000"); | |
loop { | |
let (socket, addr) = listener.accept().await?; | |
println!("New connection from {addr:?}"); | |
let bcast_tx = bcast_tx.clone(); | |
tokio::spawn(async move { | |
// Wrap the raw TCP stream into a websocket. | |
let ws_stream = ServerBuilder::new().accept(socket).await?; | |
handle_connection(addr, ws_stream, bcast_tx).await | |
}); | |
} | |
} | |
async fn handle_connection( | |
addr: SocketAddr, | |
mut ws_stream: WebsocketStream<TcpStream>, | |
bcast_tx: Sender<String>, | |
) -> Result<(), Box<dyn Error + Send + Sync>> { | |
let welcome_message = "system: Welcome!".to_string(); | |
ws_stream.send(Message::text(welcome_message)).await?; | |
let mut bcast_rx = bcast_tx.subscribe(); | |
loop { | |
tokio::select! { | |
msg = bcast_rx.recv() => { | |
ws_stream.send(Message::text(msg?)).await?; | |
} | |
incoming = ws_stream.next() => { | |
match incoming { | |
Some(Ok(msg)) => { | |
if let Some(text) = msg.as_text() { | |
bcast_tx.send(text.into())?; | |
} | |
}, | |
Some(Err(err)) => return Err(err.into()), | |
None => return Ok(()), | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment