Skip to content

Instantly share code, notes, and snippets.

@gavinwahl
Created September 24, 2024 19:12
Show Gist options
  • Save gavinwahl/22e399b07c186af54eb0bb5829681ea9 to your computer and use it in GitHub Desktop.
Save gavinwahl/22e399b07c186af54eb0bb5829681ea9 to your computer and use it in GitHub Desktop.
tcp chat server
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use slotmap::{DenseSlotMap, DefaultKey};
use tokio::select;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
let actors: Arc<Mutex<DenseSlotMap<DefaultKey, _>>> = Arc::new(Mutex::new(DenseSlotMap::new()));
loop {
let (mut socket, _) = listener.accept().await?;
let (tx, mut rx) = mpsc::channel(10);
let my_actors = actors.clone();
let my_key = {
actors.lock().await.insert(tx)
};
tokio::spawn(async move {
loop {
let mut buf = vec![0; 1024];
select! {
read = socket.read(&mut buf) => {
match read {
Ok(0) => return,
Ok(n) => {
let buf_arc = Arc::new(buf[0..n].to_vec());
let mut unlocked_actors = my_actors.lock().await;
let keys: Vec<_> = unlocked_actors.keys().collect();
for key in keys.into_iter() {
let actor = unlocked_actors.get(key).expect("cannot fail");
if let Err(_) = actor.send(buf_arc.clone()).await {
unlocked_actors.remove(key);
}
}
}
Err(e) => {
eprintln!("error reading {:?}", e);
}
}
}
read = rx.recv() => {
if let Some(data) = read {
if let Err(e) = socket.write(&data).await {
eprintln!("error writing: {:?}", e);
my_actors.lock().await.remove(my_key);
}
} else {
eprintln!("got nothing from rx");
my_actors.lock().await.remove(my_key);
return;
}
}
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment