Skip to content

Instantly share code, notes, and snippets.

@recamshak
Last active July 3, 2020 03:51
Show Gist options
  • Save recamshak/234a0c1c5d2e6bcb30d500f3b105a19c to your computer and use it in GitHub Desktop.
Save recamshak/234a0c1c5d2e6bcb30d500f3b105a19c to your computer and use it in GitHub Desktop.
use futures::{
channel::mpsc::{unbounded, UnboundedSender},
future, pin_mut, SinkExt, StreamExt,
};
use std::cell::RefCell;
use std::rc::Rc;
use std::{collections::HashMap, env, io::Error as IoError, net::SocketAddr};
use tokio::net::{TcpListener, TcpStream};
use tokio::task;
use tungstenite::error::Error;
use tungstenite::protocol::Message;
type Sender = UnboundedSender<String>;
type PeerMap = Rc<RefCell<HashMap<String, Sender>>>;
struct ChatApp {
peers: PeerMap,
}
impl ChatApp {
pub fn new() -> ChatApp {
ChatApp {
peers: PeerMap::new(RefCell::new(HashMap::new())),
}
}
pub async fn register_user<R, S>(&self, id: &str, receiver: R, sender: S)
where
R: futures::stream::Stream<Item = String>,
S: futures::sink::Sink<String>,
{
// Instead of storing the sender, create an unbounded channel
// that will forward to the sender.
// The reason is that `unbounded_send` operates on `&self`
// whereas `SinkExt.send` is async and needs `&mut self`... which means more complicated code.
// Q4. Is there a way to avoid adding a channel while keeping the code simple?
let (tx, rx) = unbounded();
self.add_user(id, tx);
self.broadcast_user_list();
let consume = receiver.for_each(|msg| {
println!("user send msg {}", msg);
self.broadcast(&msg, &id);
future::ready(())
});
let forward = rx.map(Ok).forward(sender);
pin_mut!(consume, forward);
future::select(consume, forward).await;
self.remove_user(id);
}
fn add_user(&self, id: &str, tx: UnboundedSender<String>) {
self.peers.borrow_mut().insert(id.to_string(), tx);
}
fn remove_user(&self, id: &str) {
self.peers.borrow_mut().remove(id);
}
fn broadcast(&self, msg: &String, from: &str) {
println!("Broadcasting message from {}: {}", from, msg);
self.peers
.borrow()
.iter()
.filter(|(peer_addr, _)| peer_addr != &&from)
.map(|(_, sink)| sink)
// here the msg will be cloned for every peer...
// Q5. how to avoid that?
.for_each(|sink| sink.unbounded_send(msg.clone()).unwrap_or(()));
}
fn broadcast_user_list(&self) {
let user_list = self
.peers
.borrow()
.keys()
.map(|s| &**s)
.collect::<Vec<&str>>()
.join("; ");
let user_list = format!("user list: {}\n", user_list);
self.peers
.borrow()
.iter()
.map(|(_, sink)| sink)
.for_each(|sink| sink.unbounded_send(user_list.clone()).unwrap_or(()));
}
}
async fn handle_connection(chat_app: Rc<ChatApp>, raw_stream: TcpStream, addr: SocketAddr) {
println!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", addr);
let uid = addr.to_string();
let (outgoing, incoming) = ws_stream.split();
let incoming = incoming
.filter_map(|m| future::ready(m.ok()))
.map(|m| m.to_text().unwrap().to_string());
let outgoing = outgoing.with(|msg| future::ok::<Message, Error>(Message::from(msg)));
chat_app.register_user(&uid, incoming, outgoing).await;
println!("{} disconnected", &addr);
}
#[tokio::main]
async fn main() -> Result<(), IoError> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let local = task::LocalSet::new();
local.spawn_local(async move {
let chat_app = Rc::new(ChatApp::new());
let try_socket = TcpListener::bind(&addr).await;
let mut listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", addr);
while let Ok((stream, addr)) = listener.accept().await {
task::spawn_local(handle_connection(chat_app.clone(), stream, addr));
}
});
local.await;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment