Skip to content

Instantly share code, notes, and snippets.

@paulgb
Created July 16, 2023 22:29
Show Gist options
  • Save paulgb/9898cf1ba0e36627ba940e2a00af6c40 to your computer and use it in GitHub Desktop.
Save paulgb/9898cf1ba0e36627ba940e2a00af6c40 to your computer and use it in GitHub Desktop.
Axum YJS server
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::Response,
routing::get,
Router,
};
use futures::{SinkExt, StreamExt};
use std::{convert::Infallible, future::ready, sync::Arc};
use tokio::sync::{Mutex, RwLock};
use y_sync::{awareness::Awareness, net::BroadcastGroup};
use yrs::Doc;
async fn handler(
ws: WebSocketUpgrade,
State(broadcast_group): State<Arc<BroadcastGroup>>,
) -> Response {
ws.on_upgrade(move |socket| handle_socket(socket, broadcast_group.clone()))
}
async fn handle_socket(socket: WebSocket, broadcast_group: Arc<BroadcastGroup>) {
let (sink, stream) = socket.split();
let stream = stream.filter_map(|d| match d {
Ok(Message::Binary(s)) => ready(Some(Ok::<_, Infallible>(s))),
_ => {
println!("got here");
ready(None)
}
});
let sink = sink.with(|d| ready(Ok::<_, axum::Error>(Message::Binary(d))));
let sink = Arc::new(Mutex::new(sink));
let sub = broadcast_group.subscribe(sink, stream);
match sub.completed().await {
Ok(_) => println!("socket closed"),
Err(e) => println!("socket closed with error: {}", e),
}
}
#[tokio::main]
async fn main() {
let awareness = Arc::new(RwLock::new(Awareness::new(Doc::new())));
let broadcast_group = Arc::new(BroadcastGroup::new(awareness, 32).await);
let app = Router::new()
.route("/my-room", get(handler))
.with_state(broadcast_group);
axum::Server::bind(&"0.0.0.0:8000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment