Last active
July 3, 2020 03:51
-
-
Save recamshak/234a0c1c5d2e6bcb30d500f3b105a19c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{ | |
channel::mpsc::{unbounded, UnboundedSender}, | |
future, pin_mut, SinkExt, StreamExt, | |
}; | |
use std::cell::RefCell; | |
use std::rc::Rc; | |
use std::{collections::HashMap, env, io::Error as IoError, net::SocketAddr}; | |
use tokio::net::{TcpListener, TcpStream}; | |
use tokio::task; | |
use tungstenite::error::Error; | |
use tungstenite::protocol::Message; | |
type Sender = UnboundedSender<String>; | |
type PeerMap = Rc<RefCell<HashMap<String, Sender>>>; | |
struct ChatApp { | |
peers: PeerMap, | |
} | |
impl ChatApp { | |
pub fn new() -> ChatApp { | |
ChatApp { | |
peers: PeerMap::new(RefCell::new(HashMap::new())), | |
} | |
} | |
pub async fn register_user<R, S>(&self, id: &str, receiver: R, sender: S) | |
where | |
R: futures::stream::Stream<Item = String>, | |
S: futures::sink::Sink<String>, | |
{ | |
// Instead of storing the sender, create an unbounded channel | |
// that will forward to the sender. | |
// The reason is that `unbounded_send` operates on `&self` | |
// whereas `SinkExt.send` is async and needs `&mut self`... which means more complicated code. | |
// Q4. Is there a way to avoid adding a channel while keeping the code simple? | |
let (tx, rx) = unbounded(); | |
self.add_user(id, tx); | |
self.broadcast_user_list(); | |
let consume = receiver.for_each(|msg| { | |
println!("user send msg {}", msg); | |
self.broadcast(&msg, &id); | |
future::ready(()) | |
}); | |
let forward = rx.map(Ok).forward(sender); | |
pin_mut!(consume, forward); | |
future::select(consume, forward).await; | |
self.remove_user(id); | |
} | |
fn add_user(&self, id: &str, tx: UnboundedSender<String>) { | |
self.peers.borrow_mut().insert(id.to_string(), tx); | |
} | |
fn remove_user(&self, id: &str) { | |
self.peers.borrow_mut().remove(id); | |
} | |
fn broadcast(&self, msg: &String, from: &str) { | |
println!("Broadcasting message from {}: {}", from, msg); | |
self.peers | |
.borrow() | |
.iter() | |
.filter(|(peer_addr, _)| peer_addr != &&from) | |
.map(|(_, sink)| sink) | |
// here the msg will be cloned for every peer... | |
// Q5. how to avoid that? | |
.for_each(|sink| sink.unbounded_send(msg.clone()).unwrap_or(())); | |
} | |
fn broadcast_user_list(&self) { | |
let user_list = self | |
.peers | |
.borrow() | |
.keys() | |
.map(|s| &**s) | |
.collect::<Vec<&str>>() | |
.join("; "); | |
let user_list = format!("user list: {}\n", user_list); | |
self.peers | |
.borrow() | |
.iter() | |
.map(|(_, sink)| sink) | |
.for_each(|sink| sink.unbounded_send(user_list.clone()).unwrap_or(())); | |
} | |
} | |
async fn handle_connection(chat_app: Rc<ChatApp>, raw_stream: TcpStream, addr: SocketAddr) { | |
println!("Incoming TCP connection from: {}", addr); | |
let ws_stream = tokio_tungstenite::accept_async(raw_stream) | |
.await | |
.expect("Error during the websocket handshake occurred"); | |
println!("WebSocket connection established: {}", addr); | |
let uid = addr.to_string(); | |
let (outgoing, incoming) = ws_stream.split(); | |
let incoming = incoming | |
.filter_map(|m| future::ready(m.ok())) | |
.map(|m| m.to_text().unwrap().to_string()); | |
let outgoing = outgoing.with(|msg| future::ok::<Message, Error>(Message::from(msg))); | |
chat_app.register_user(&uid, incoming, outgoing).await; | |
println!("{} disconnected", &addr); | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), IoError> { | |
let addr = env::args() | |
.nth(1) | |
.unwrap_or_else(|| "127.0.0.1:8080".to_string()); | |
let local = task::LocalSet::new(); | |
local.spawn_local(async move { | |
let chat_app = Rc::new(ChatApp::new()); | |
let try_socket = TcpListener::bind(&addr).await; | |
let mut listener = try_socket.expect("Failed to bind"); | |
println!("Listening on: {}", addr); | |
while let Ok((stream, addr)) = listener.accept().await { | |
task::spawn_local(handle_connection(chat_app.clone(), stream, addr)); | |
} | |
}); | |
local.await; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment