Skip to content

Instantly share code, notes, and snippets.

@tekacs
Created February 22, 2023 06:49
Show Gist options
  • Save tekacs/b2cb7d19e8ec9a7c7838251b2e427d01 to your computer and use it in GitHub Desktop.
Save tekacs/b2cb7d19e8ec9a7c7838251b2e427d01 to your computer and use it in GitHub Desktop.
use async_stream::stream;
use async_tungstenite::accept_async;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::io;
use tarpc;
use tarpc::serde_transport::*;
use tokio::net::{TcpListener, TcpStream};
use tokio_serde::*;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use ws_stream_tungstenite::*;
pub async fn listen<Item, SinkItem, Codec, CodecFn>(
addr: &str,
codec_fn: CodecFn,
) -> Result<
impl Stream<Item = Transport<WsStream<Compat<TcpStream>>, Item, SinkItem, Codec>>,
io::Error,
>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let config = LengthDelimitedCodec::builder();
let listener = TcpListener::bind(addr).await?;
Ok(stream! {
loop {
let (tcp_stream, _) = listener.accept().await.unwrap();
let websocket_stream = accept_async(tcp_stream.compat()).await.unwrap();
let ws_stream: WsStream<Compat<TcpStream>> = WsStream::new(websocket_stream);
yield new(config.new_framed(ws_stream), (codec_fn)());
}
})
}
use async_io_stream::IoStream;
use serde::{Deserialize, Serialize};
use tarpc;
use tarpc::serde_transport::*;
use tokio_serde::*;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use ws_stream_wasm::*;
pub struct WebsocketConnection<Item, SinkItem, Codec>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
{
pub ws_meta: WsMeta,
pub transport: Transport<IoStream<WsStreamIo, Vec<u8>>, Item, SinkItem, Codec>,
}
pub async fn connect<Item, SinkItem, Codec, CodecFn>(
addr: &str,
codec_fn: CodecFn,
) -> Result<WebsocketConnection<Item, SinkItem, Codec>, WsErr>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let (ws_meta, ws_stream) = WsMeta::connect(addr, None).await?;
let io = ws_stream.into_io();
let config = LengthDelimitedCodec::builder();
Ok(WebsocketConnection {
ws_meta,
transport: new(config.new_framed(io), (codec_fn)()),
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment