Last active
April 16, 2020 08:26
-
-
Save DarkSector/eec376803467e47f4bb10fe02a3d7c1e to your computer and use it in GitHub Desktop.
Supervisor - Client actor system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::panic; | |
use std::sync::{Arc, Mutex}; | |
use std::thread; | |
use std::time::Duration; | |
use actix::io::SinkWrite; | |
use actix::*; | |
use actix_codec::Framed; | |
use awc::{ | |
error::WsProtocolError, | |
ws::{Codec, Frame, Message}, | |
BoxedSocket, Client, | |
}; | |
use bytes::Bytes; | |
use crossbeam::{Receiver, Sender}; | |
use futures::stream::{SplitSink, StreamExt}; | |
use serde::Deserialize; | |
use serde_json; | |
#[derive(Message)] | |
#[rtype(result = "()")] | |
struct WSClientStatus { | |
connected: bool, | |
} | |
// All actors must communicate via a Message derivative | |
#[derive(Message)] | |
#[rtype(result = "()")] | |
struct ClientCommand(); | |
struct WSClient { | |
sink: Option<SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>>, | |
supervisor_addr: Addr<Supervisor>, | |
} | |
impl WSClient { | |
// heartbeat implementation for the actor | |
// All it does is send a Pong message back via the sink to the server | |
// every 1 second | |
fn hb(&self, ctx: &mut Context<Self>) { | |
// let sink = self.sink.unwrap(); | |
if self.sink.is_some() { | |
ctx.run_later(Duration::new(1, 0), |act, ctx| { | |
println!("foo"); | |
// println!("{}", act.sink); | |
// act.sink.write(Message::Pong(Bytes::from_static(b""))).unwrap(); | |
// calls itself after calling once so that it can run every 1 second | |
act.hb(ctx); | |
}); | |
} | |
} | |
} | |
// implement Actor trait for the client | |
impl Actor for WSClient { | |
type Context = Context<Self>; | |
fn started(&mut self, ctx: &mut Context<Self>) { | |
// start heartbeats otherwise server will disconnect after 10 seconds | |
self.hb(ctx) | |
} | |
fn stopped(&mut self, _: &mut Context<Self>) { | |
info!("Disconnected"); | |
// Don't kill the system, that is for the main supervisor to kill | |
// System::current().stop(); | |
// Instead send the message to the supervisor | |
} | |
} | |
// In order for WSClient to behave as an actor it needs to implement a handler as well | |
impl Handler<ClientCommand> for WSClient { | |
type Result = (); | |
fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context<Self>) { | |
let msg = serde_json::to_string(&msg.0).ok().unwrap(); | |
match &self.sink { | |
// Some(mut t) => t.write(Message::Text(msg)).unwrap(), | |
_ => println!("Handler doesn't have sink yet") | |
} | |
} | |
} | |
// StreamHandler | |
// This is helper trait that allows to handle `Stream` in | |
// Write handler | |
impl actix::io::WriteHandler<WsProtocolError> for WSClient {} | |
/****************** SUPERVISOR ****************/ | |
struct Supervisor { | |
addr: Addr<WSClient> // keep the address of the client | |
} | |
impl Actor for Supervisor { | |
type Context = Context<Self>; | |
fn stopped(&mut self, ctx: &mut Context<Self>) { | |
info!("Supervisor disconnected shutting downs system"); | |
System::current().stop(); | |
} | |
} | |
impl Handler<WSClientStatus> for Supervisor { | |
type Result = (); | |
fn handle(&mut self, client_status: WSClientStatus, _ctx: &mut Context<Self>) { | |
// this is where we handle incoming WSClientStatus | |
// if we receive a false here | |
// we spawn the actor again | |
// We will need to keep a track of how many reconnects | |
// TODO: Figure out how to restart/start the arbiter here | |
match client_status.connected { | |
false => { | |
println!("Received a false"); | |
Arbiter::spawn(async { | |
let (response, framed) = Client::new() | |
.ws(client_status.conf.device_uri) | |
.connect() | |
.await | |
.map_err(|e| { | |
panic!(format!("Error connecting to websocket: {}", e)); | |
}) | |
.unwrap(); | |
debug!("{:?}", response); | |
// This is the ControlToServer that needs to be sent initially | |
let initial_update_device = ControlToServer::DeviceUpdatedState { | |
name: ControlToServerName::UpdateState, | |
state: DeviceState::NotWriting, | |
}; | |
// Split the BoxedSocket | |
let (sink, stream) = framed.split(); | |
let rx = writer_client.rx_resp.take().unwrap(); | |
WSClient::add_stream(stream , ctx); | |
self.addr = sink; | |
}); | |
} | |
true => println!("Received a true") | |
} | |
} | |
} | |
/**************************************************/ | |
pub fn start_device_client() -> SystemRunner { | |
let sys = System::new("websocket-client"); | |
let super_visor_actor = Supervisor::create(|ctx| { | |
// supervisor address | |
let supervisor_add = ctx.address(); // not using this yet | |
let ws_client = WSClient::create(|ctx| { | |
// unfortunately the client has to have a stream going before it | |
// can be initialized | |
// for that a connection needs to exist | |
WSClient { | |
// sink: SinkWrite::new(sink, ctx), | |
sink: None, | |
supervisor_addr: supervisor_add, // send it thee current address of the supervisor as well | |
} | |
}); | |
Supervisor { | |
addr: ws_client // Is this necessary? | |
} | |
}); | |
// send to the supervisor that there is connection | |
super_visor_actor.do_send(WSClientStatus { | |
connected: false, | |
}); | |
sys | |
} | |
fn main() { | |
let sys = start_device_client(); | |
sys.run.unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment