Skip to content

Instantly share code, notes, and snippets.

@hsnks100
Created October 7, 2021 04:51
Show Gist options
  • Save hsnks100/9b826cfd92f57249b9175184c20e59f0 to your computer and use it in GitHub Desktop.
Save hsnks100/9b826cfd92f57249b9175184c20e59f0 to your computer and use it in GitHub Desktop.
tcp 비동기 통신 rust 예제
use async_std::io::WriteExt;
use async_std::task;
use async_std::prelude::*;
use async_std::net::{TcpListener, TcpStream};
use async_std::sync::{Arc, Mutex};
use async_std::channel::{Receiver, Sender, self};
use bytes::{Bytes, BytesMut};
fn main() {
task::block_on(entrypoint()).expect("failed to initialize!");
}
enum BroadcastCommand {
AddMember(TcpStream),
SendMessage(TcpStream, Bytes),
Exit,
}
async fn entrypoint() -> anyhow::Result<()> {
let listener = TcpListener::bind("0.0.0.0:8080").await?;
let (sender, recver) = channel::unbounded();
let broadcaster = task::spawn(broadcaster(recver));
while let Ok((mut conn, _)) = listener.accept().await {
//conn.write_all(b"dfsdafasdf").await?;
// broadcaster한테 등록하라고 명령
sender.send(BroadcastCommand::AddMember(conn.clone())).await?;
task::spawn(connection(sender.clone(), conn.clone()));
}
sender.send(BroadcastCommand::Exit).await?;
broadcaster.await;
Ok(())
}
async fn broadcaster(recv: Receiver<BroadcastCommand>) {
let mut members = Vec::new();
// 여기선 event를 받아서 뭐 member에 넣거나 send하거나 등등 하고.
loop {
match recv.recv().await.unwrap() {
BroadcastCommand::AddMember(member) => {
println!("AddMember");
members.push(member);
},
BroadcastCommand::SendMessage(member, bytes) => {
println!("SendMessage");
for v in &mut members {
//if v.as_raw_socket() != member.as_raw_socket() {
v.write(&bytes).await;
//}
}
}
BroadcastCommand::Exit => {
break;
},
}
}
}
async fn connection(sender: Sender<BroadcastCommand>, mut stream: TcpStream) {
println!("eeeeeeeeeeeee222");
// connection에서 받아서 broadcaster에 전달하고.
let b = Bytes::from_static(b"hello");
sender.send(BroadcastCommand::SendMessage(stream.clone(), b)).await.unwrap();
// buf = BytesMut::with_capacity(3);
let mut buf = BytesMut::new();
buf.resize(1024, 0);
while let Ok(len) = stream.read(&mut buf).await {
if len != 0 {
buf.split_off(len);
sender.send(BroadcastCommand::SendMessage(stream.clone(), buf.freeze())).await.unwrap();
buf = BytesMut::new();
buf.resize(1024, 0);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment