Last active
August 5, 2020 12:34
-
-
Save unpluggedcoder/b1bd611c5009afe23a76d204d5982df7 to your computer and use it in GitHub Desktop.
Json-Rpc in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "jrpc" | |
version = "0.1.0" | |
authors = ["unpluggedcoder <[email protected]>"] | |
edition = "2018" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
log = "0.4" | |
pretty_env_logger = "0.4" | |
lazy_static = "1.4" | |
tokio = { version = "0.2", features = ["full"] } # tokio 0.2 not compatibile with futures 0.1 | |
futures = { version = "0.3", features = ["compat"] } # We need "compat" feature. | |
jsonrpc-http-server = "14.2.0" | |
jsonrpc-core = "14.2.0" # Use futures 0.1 | |
jsonrpc-derive = "14.2.0" | |
serde = { version = "1.0", features = ["derive"] } | |
serde_json = "1.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// #![feature(async_await, futures_api)] | |
#[macro_use] | |
extern crate log; | |
#[macro_use] | |
extern crate lazy_static; | |
use futures::future::{FutureExt, TryFutureExt}; | |
use std::future::Future; | |
use std::collections::HashMap; | |
use std::env; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::thread; | |
use std::time::Duration; | |
use jsonrpc_core::{self, types, BoxFuture, MetaIoHandler, Metadata, Result}; | |
use jsonrpc_derive::rpc; | |
use jsonrpc_http_server::{hyper, ServerBuilder}; | |
use tokio::runtime; | |
use tokio::sync::mpsc::{self, Sender, UnboundedSender}; | |
lazy_static! { | |
static ref REQ_ID: AtomicUsize = AtomicUsize::new(0); | |
} | |
fn async_wait<T: Send + 'static>( | |
fut: impl Future<Output = Result<T>> + Send + 'static, | |
) -> BoxFuture<T> { | |
let compat_fut = fut.boxed().compat(); | |
Box::new(compat_fut) | |
} | |
#[derive(Default, Clone)] | |
pub struct Meta(pub Option<UnboundedSender<(usize, Sender<usize>)>>); | |
impl Metadata for Meta {} | |
#[rpc(server)] | |
pub trait Rpc { | |
type Metadata; | |
/// Performs asynchronous operation | |
#[rpc(meta, name = "beFancy")] | |
fn call(&self, meta: Self::Metadata) -> BoxFuture<usize>; | |
} | |
#[derive(Default, Clone)] | |
struct RpcImpl; | |
impl Rpc for RpcImpl { | |
type Metadata = Meta; | |
fn call(&self, meta: Self::Metadata) -> BoxFuture<usize> { | |
let id = REQ_ID.fetch_add(1, Ordering::SeqCst); | |
let (tx, rx) = mpsc::channel::<usize>(1); | |
if let Some(sender) = meta.0 { | |
sender.send((id, tx)).unwrap(); | |
} | |
let resp_fut = async move { | |
match rx.recv().await { | |
Some(id) => Ok(id), | |
None => Err(types::Error::new(types::ErrorCode::InternalError)), | |
} | |
}; | |
async_wait(resp_fut) | |
} | |
} | |
fn main() { | |
env::set_var("RUST_LOG", "info"); | |
pretty_env_logger::init(); | |
let mut rt = runtime::Builder::new() | |
.threaded_scheduler() | |
.enable_all() | |
.build() | |
.expect("Runtime build failed."); | |
let (broker_sender, mut broker_receiver) = mpsc::unbounded_channel(); | |
rt.spawn(async { | |
let mut io = MetaIoHandler::default(); | |
let rpc = RpcImpl; | |
io.extend_with(rpc.to_delegate()); | |
let _server = ServerBuilder::new(io) | |
.meta_extractor(move |_: &hyper::Request<hyper::Body>| { | |
info!("Meta extractor called."); | |
Meta(Some(broker_sender.clone())) | |
}) | |
.start_http(&"127.0.0.1:9527".parse().unwrap()) | |
.expect("Unable to start RPC server"); | |
_server.wait(); | |
}); | |
rt.block_on(async move { | |
let mut rpc_resps: HashMap<usize, Sender<usize>> = HashMap::new(); | |
info!("Borker loop start..."); | |
loop { | |
if let Some((id, mut sender)) = broker_receiver.recv().await { | |
info!("Broker received: id({}).", id); | |
// Sleep for awhile | |
thread::sleep(Duration::from_secs(1)); | |
sender.send(id * id).await.unwrap(); | |
info!("Broker sent: id({})", id); | |
rpc_resps.insert(id, sender); | |
} else { | |
info!("Broker channel broken."); | |
break; | |
} | |
} | |
info!("Broker loop finished."); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate log; | |
#[macro_use] | |
extern crate lazy_static; | |
use std::collections::HashMap; | |
use std::env; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::thread; | |
use std::time::Duration; | |
use jsonrpc_core::futures::{Async, Future as RpcFuture, Poll}; | |
use jsonrpc_core::{self, *}; | |
use jsonrpc_derive::rpc; | |
use jsonrpc_http_server::{hyper, ServerBuilder}; | |
use tokio::runtime; | |
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender, UnboundedSender}; | |
lazy_static! { | |
static ref REQ_ID: AtomicUsize = AtomicUsize::new(0); | |
} | |
#[derive(Debug)] | |
struct AsyncResponse { | |
recv: Receiver<usize>, | |
} | |
impl RpcFuture for AsyncResponse { | |
type Item = Result<String>; | |
type Error = types::Error; | |
/// This WON'T WORK. See fix.rs | |
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |
match self.recv.try_recv() { | |
Err(TryRecvError::Empty) => { | |
// Only polled twice if it's not ready. | |
info!("AsyncResponse not ready."); | |
Ok(Async::NotReady) | |
} | |
Err(TryRecvError::Closed) => { | |
info!("AsyncResponse closed."); | |
Err(Error::new(ErrorCode::InternalError).into()) | |
} | |
Ok(value) => { | |
info!("AsyncResponse ready, response value: {}", value); | |
Ok(Async::Ready(Ok(value.to_string()))) | |
} | |
} | |
} | |
} | |
#[derive(Default, Clone)] | |
pub struct Meta(pub Option<UnboundedSender<(usize, Sender<usize>)>>); | |
impl Metadata for Meta {} | |
#[rpc(server)] | |
pub trait Rpc { | |
type Metadata; | |
/// Performs asynchronous operation | |
#[rpc(meta, name = "beFancy")] | |
fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>>; | |
} | |
#[derive(Default, Clone)] | |
struct RpcImpl; | |
impl Rpc for RpcImpl { | |
type Metadata = Meta; | |
fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>> { | |
let id = REQ_ID.fetch_add(1, Ordering::SeqCst); | |
let (tx, rx) = mpsc::channel::<usize>(1); | |
if let Some(sender) = meta.0 { | |
sender.send((id, tx)).unwrap(); | |
} | |
Box::new(AsyncResponse { recv: rx }) | |
} | |
} | |
fn main() { | |
env::set_var("RUST_LOG", "info"); | |
pretty_env_logger::init(); | |
let mut rt = runtime::Builder::new() | |
.threaded_scheduler() | |
.enable_all() | |
.build() | |
.expect("Runtime build failed."); | |
let (broker_sender, mut broker_receiver) = mpsc::unbounded_channel(); | |
{ | |
let sender = broker_sender.clone(); | |
rt.spawn(async { | |
let mut io = MetaIoHandler::default(); | |
let rpc = RpcImpl; | |
io.extend_with(rpc.to_delegate()); | |
let _server = ServerBuilder::new(io) | |
.meta_extractor(move |_: &hyper::Request<hyper::Body>| { | |
info!("Meta extractor called."); | |
Meta(Some(sender.clone())) | |
}) | |
.start_http(&"127.0.0.1:9527".parse().unwrap()) | |
.expect("Unable to start RPC server"); | |
_server.wait(); | |
}); | |
} | |
rt.block_on(async move { | |
let mut rpc_resps: HashMap<usize, Sender<usize>> = HashMap::new(); | |
info!("Borker loop start..."); | |
loop { | |
if let Some((id, mut sender)) = broker_receiver.recv().await { | |
info!("Broker received: id({}).", id); | |
// Sleep for awhile | |
thread::sleep(Duration::from_secs(2)); | |
sender.send(id * id).await.unwrap(); | |
info!("Broker sent: id({})", id); | |
rpc_resps.insert(id, sender); | |
} else { | |
info!("Broker channel broken."); | |
break; | |
} | |
} | |
info!("Broker loop finished."); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment