Created
March 22, 2020 18:19
-
-
Save flip111/697e6fa000dba286368c1cd95931de9d to your computer and use it in GitHub Desktop.
Actix GraphQL over websocket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Combination of: | |
// https://github.com/actix/examples/tree/master/juniper | |
// https://github.com/actix/examples/tree/master/websocket | |
mod schema; | |
use crate::schema::{create_schema, Schema}; | |
use actix::prelude::*; | |
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; | |
use actix_web_actors::ws; | |
use bytes::{BytesMut}; | |
use juniper::{Variables}; | |
use std::sync::Arc; | |
use std::time::{Duration, Instant}; | |
/// How often heartbeat pings are sent | |
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); | |
/// How long before lack of client response causes a timeout | |
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); | |
/// do websocket handshake and start `MyWebSocket` actor | |
async fn ws_index(r: HttpRequest, stream: web::Payload, graphql_schema: web::Data<Arc<Schema>>) -> Result<HttpResponse, Error> { | |
println!("{:?}", r); | |
let res = ws::start(MyWebSocket::new(graphql_schema), &r, stream); | |
println!("{:?}", res); | |
res | |
} | |
/// websocket connection is long running connection, it easier | |
/// to handle with an actor | |
struct MyWebSocket { | |
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), | |
/// otherwise we drop connection. | |
hb: Instant, | |
graphql_schema: web::Data<Arc<Schema>> | |
} | |
impl MyWebSocket { | |
fn new(graphql_schema: web::Data<Arc<Schema>>) -> Self { | |
Self { | |
hb: Instant::now(), | |
graphql_schema: graphql_schema | |
} | |
} | |
/// helper method that sends ping to client every second. | |
/// | |
/// also this method checks heartbeats from client | |
fn hb(&self, ctx: &mut <Self as Actor>::Context) { | |
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { | |
// check client heartbeats | |
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { | |
// heartbeat timed out | |
println!("Websocket Client heartbeat failed, disconnecting!"); | |
// stop actor | |
ctx.stop(); | |
// don't try to send a ping | |
return; | |
} | |
ctx.ping(b""); | |
}); | |
} | |
} | |
impl Actor for MyWebSocket { | |
type Context = ws::WebsocketContext<Self>; | |
/// Method is called on actor start. We start the heartbeat process here. | |
fn started(&mut self, ctx: &mut Self::Context) { | |
self.hb(ctx); | |
} | |
} | |
/// Handler for `ws::Message` | |
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket { | |
fn handle( | |
&mut self, | |
msg: Result<ws::Message, ws::ProtocolError>, | |
ctx: &mut Self::Context, | |
) { | |
// process websocket messages | |
println!("WS: {:?}", msg); | |
match msg { | |
Ok(ws::Message::Ping(msg)) => { | |
self.hb = Instant::now(); | |
ctx.pong(&msg); | |
} | |
Ok(ws::Message::Pong(_)) => { | |
self.hb = Instant::now(); | |
} | |
Ok(ws::Message::Text(text)) => { | |
let (res, _errors) = juniper::execute( | |
&text, | |
None, | |
&self.graphql_schema, | |
&Variables::new(), | |
&(), | |
).unwrap(); | |
let user = serde_json::to_string(&res); | |
// match user | |
ctx.text("text: ".to_string() + &user.unwrap_or("json decode error".to_string())) | |
} | |
Ok(ws::Message::Binary(bin)) => { | |
let mut bytes = BytesMut::new(); | |
bytes.extend_from_slice(b"binary: "); | |
bytes.extend_from_slice(&bin); | |
ctx.binary(bytes) | |
} | |
Ok(ws::Message::Close(_)) => { | |
ctx.stop(); | |
} | |
_ => ctx.stop(), | |
} | |
} | |
} | |
#[actix_rt::main] | |
async fn main() -> std::io::Result<()> { | |
std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info"); | |
env_logger::init(); | |
// Create Juniper schema | |
let schema = std::sync::Arc::new(create_schema()); | |
HttpServer::new(move || { | |
App::new() | |
// ReaderT pattern for context .. | |
.data(schema.clone()) | |
// enable logger | |
.wrap(middleware::Logger::default()) | |
// websocket route | |
.service(web::resource("/ws/").route(web::get().to(ws_index))) | |
// static files | |
// .service(fs::Files::new("/", "static/").index_file("index.html")) | |
}) | |
// start http server on 127.0.0.1:8080 | |
.bind("127.0.0.1:8080")? | |
.run() | |
.await | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment