Created
June 30, 2021 18:47
-
-
Save surfingtomchen/e7e9db916270134453928069e29b80b8 to your computer and use it in GitHub Desktop.
Async in actix websocket actor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub struct WsActor; | |
#[derive(Debug)] | |
struct BinResponse(websocket::Response); | |
#[derive(Message)] | |
#[rtype(result = "Result<(), ()>")] | |
struct Router(Bytes); | |
impl Actor for WsActor { | |
type Context = ws::WebsocketContext<Self>; | |
fn started(&mut self, ctx: &mut Self::Context) { | |
println!("Websocket is started!!!!!"); | |
ctx.ping(b""); | |
} | |
} | |
impl StreamHandler<BinResponse> for WsActor { | |
fn handle(&mut self, msg: BinResponse, ctx: &mut Self::Context) { | |
let resp = msg.0; | |
let b = resp.write_to_bytes().unwrap(); | |
ctx.binary(b); | |
} | |
} | |
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor { | |
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) { | |
println!("WS: {:?}", msg); | |
match msg { | |
Ok(ws::Message::Ping(msg)) => { | |
ctx.pong(&msg); | |
} | |
Ok(ws::Message::Text(_)) => { | |
ctx.text("No text message allowed.".to_string()); | |
} | |
Ok(ws::Message::Binary(bin)) => ctx.notify(Router(bin)), | |
Ok(ws::Message::Close(reason)) => { | |
ctx.close(reason); | |
ctx.stop(); | |
} | |
_ => (), | |
} | |
} | |
} | |
impl Handler<Router> for WsActor { | |
type Result = Result<(), ()>; | |
fn handle(&mut self, msg: Router, ctx: &mut Self::Context) -> Self::Result { | |
let fut = async move { | |
let bytes = msg.0; | |
let mut wrapper = websocket::Request::new(); | |
let mut stream = CodedInputStream::from_bytes(bytes.as_ref()); | |
let mut error_reply = websocket::Response::new(); | |
error_reply.field_type = ResponseType::ResponseTypeNone; | |
error_reply.code = AppError::ProtobufError { cause: "".to_owned() }.error_code(); | |
error_reply.set_none_response(NoneResp::new()); | |
if let Err(e) = wrapper.merge_from(&mut stream) { | |
common::error!("failed to parse protobuf; err = {:?}", e); | |
error_reply.error_message = format!("failed to parse protobuf: {:?}", e); | |
return error_reply; | |
} | |
match wrapper.field_type { | |
RequestType::TypeHelloReq => handlers::hello(wrapper.get_hello_request()).await, | |
_ => handlers::hello(wrapper.get_hello_request()).await, | |
} | |
.unwrap_or_else(|err| { | |
error_reply.error_message = format!("failed to handle websocket request: {:?}", err); | |
error_reply | |
}) | |
}; | |
ctx.add_stream(stream::once(async { BinResponse(fut.await) })); | |
Ok(()) | |
} | |
} | |
impl WsActor { | |
pub fn new() -> Self { | |
WsActor | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment