Last active
June 19, 2018 15:00
-
-
Save autodidaddict/33ce20152d57a8721102fedf90ae8103 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn get_library_entries( | |
&self, | |
ctx: RpcContext, | |
req: LibraryEntriesRequest, | |
resp: ServerStreamingSink<LibraryEntry>, | |
) { | |
info!( | |
"Handling get library entries request: {}", | |
req.get_agent_id() | |
); | |
match self.data_store.get_owned_entries(req.get_agent_id()) { | |
Ok(raw_entries) => { | |
let out: Vec<(LibraryEntry, WriteFlags)> = raw_entries.into_iter().map(|entry| { | |
(entry.into(), WriteFlags::default()) | |
}) | |
.collect(); | |
let f = resp.send_all(stream::iter_ok::<_, Error>(out)).map(|_|()); | |
ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e))); | |
}, | |
Err(_) => { | |
let f = resp.fail(RpcStatus::new(RpcStatusCode::Internal, None)); | |
ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e))); | |
} | |
} | |
} |
Yeah, I don't think we can turn it into a single Future because we can't move sink into two different closures.
What I ended up doing for my toy example was this
use futures::future;
use futures::prelude::*;
use futures::stream;
use grpcio::{Error, RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags};
use protos::no_toms::{HelloRequest, HelloResponse};
use protos::no_toms_grpc::NoToms;
use std::sync::Arc;
use futures::{Future, AndThen, OrElse};
#[derive(Clone)]
struct NoTomsService;
fn get_message(name: &str) -> Result<String, String> {
match name {
"tom" => Err("No Toms please.".into()),
_ => Ok(format!("Hello, {}", name)),
}
}
impl NoToms for NoTomsService {
fn say_hello(
&self,
ctx: RpcContext,
req: HelloRequest,
sink: ServerStreamingSink<HelloResponse>,
) {
let msg = get_message(&req.name);
// I don't know if this is idiomatic Rust, but it works
if let Err(err) = msg {
ctx.spawn(
sink.fail(RpcStatus::new(RpcStatusCode::InvalidArgument, Some(err)))
.map(|_| ())
.map_err(|_| ())
);
return
}
// It is safe to unwrap the message because we return early when it is an err
let msg = msg.unwrap();
let f = sink.send_all(stream::iter_ok::<_, Error>(respItems(msg)));
ctx.spawn(f.map(|_| ()).map_err(|_| ()));
}
}
fn respItems(msg: String) -> Vec<(HelloResponse, WriteFlags)> {
let mut resp = HelloResponse::new();
resp.set_message(msg);
vec![(resp, WriteFlags::default())]
}
fn internal_error(msg: Option<String>) -> RpcStatus {
RpcStatus::new(RpcStatusCode::Internal, msg)
}
It is logically the same as your code and stinks of golang. shrug
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
That's exactly where I am right now. I'm getting a move error on resp in the closure. It took my slow brain a little while, but I eventually managed to get where this comment is. It's so close I can taste it... but yet it won't compile.