Skip to content

Instantly share code, notes, and snippets.

@f5io
Last active June 20, 2019 12:30
Show Gist options
  • Save f5io/127fd1027b226b7d90bac2612a7d3534 to your computer and use it in GitHub Desktop.
Save f5io/127fd1027b226b7d90bac2612a7d3534 to your computer and use it in GitHub Desktop.
extern crate actix;
extern crate actix_web;
extern crate bytes;
extern crate derive_more;
extern crate futures;
extern crate flatbuffers;
use derive_more::Display;
use flatbuffers::{get_root, Follow, Table};
use bytes::{BytesMut};
use std::fmt;
use std::ops::{Deref, DerefMut};
use futures::{Future, Poll, Stream};
use actix_web::dev::{HttpResponseBuilder, Payload};
use actix_web::error::{Error, PayloadError, ResponseError};
use actix_web::http::header::{CONTENT_LENGTH, CONTENT_TYPE};
use actix_web::{FromRequest, HttpMessage, HttpRequest, HttpResponse, Responder};
const MAX_SIZE: usize = 262_144; // max payload size is 256k
#[derive(Debug, Display)]
pub enum FlatBufferPayloadError {
/// Payload size is bigger than 256k
#[display(fmt = "Payload size is bigger than 256k")]
Overflow,
/// Content type error
#[display(fmt = "Content type error")]
ContentType,
/// Payload error
#[display(fmt = "Error that occur during reading payload: {}", _0)]
Payload(PayloadError),
}
impl ResponseError for FlatBufferPayloadError {
fn error_response(&self) -> HttpResponse {
match *self {
FlatBufferPayloadError::Overflow => HttpResponse::PayloadTooLarge().into(),
_ => HttpResponse::BadRequest().into(),
}
}
}
impl From<PayloadError> for FlatBufferPayloadError {
fn from(err: PayloadError) -> FlatBufferPayloadError {
FlatBufferPayloadError::Payload(err)
}
}
pub struct FlatBuffer<'a, T: Follow<'a> + 'a>(pub T::Inner);
impl<'a, T: Follow<'a> + 'a> Deref for FlatBuffer<'a, T> {
type Target = T::Inner;
fn deref(&self) -> &T::Inner {
&self.0
}
}
impl<'a, T: Follow<'a> + 'a> DerefMut for FlatBuffer<'a, T> {
fn deref_mut(&mut self) -> &mut T::Inner {
&mut self.0
}
}
impl<'a, T: Follow<'a> + 'a> fmt::Debug for FlatBuffer<'a, T>
where
T::Inner: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FlatBuffer: {:?}", self.0)
}
}
impl<'a, T: Follow<'a> + 'a> fmt::Display for FlatBuffer<'a, T>
where
T::Inner: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl<'a, T: Follow<'a> + 'a> FromRequest for FlatBuffer<'a, T> {
type Config = ();
type Error = FlatBufferPayloadError;
type Future = Box<dyn Future<Item = Self, Error = FlatBufferPayloadError>>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
Box::new(
FlatBufferMessage::<'a, T>::new(req, payload)
.map_err(|e| e.into())
.map(FlatBuffer)
)
}
}
pub struct FlatBufferMessage<'a, T: Follow<'a> + 'a> {
stream: Option<Payload>,
fut: Option<Box<dyn Future<Item = T::Inner, Error = FlatBufferPayloadError>>>,
data: Option<Vec<u8>>,
}
impl<'a, T: Follow<'a> + 'a> FlatBufferMessage<'a, T> {
pub fn new(_req: &HttpRequest, payload: &mut Payload) -> Self {
FlatBufferMessage {
stream: Some(payload.take()),
fut: None,
data: None,
}
}
}
impl<'a, T: Follow<'a> + 'a> Future for FlatBufferMessage<'a, T> {
type Item = T::Inner;
type Error = FlatBufferPayloadError;
fn poll(&mut self) -> Poll<T::Inner, FlatBufferPayloadError> {
if let Some(ref mut fut) = self.fut {
return fut.poll();
}
let fut = self
.stream
.take()
.expect("FlatBufferMessage could not be used second time")
.from_err()
.fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > MAX_SIZE {
Err(FlatBufferPayloadError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.map(|body| {
self.data = Some(body.to_vec());
self.data
})
.map(|body| get_root::<T>(&body.unwrap()));
self.fut = Some(Box::new(fut));
self.poll()
}
}
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'a` due to conflicting requirements
--> src/actix_fbs.rs:91:79
|
91 | fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
| _______________________________________________________________________________^
92 | | Box::new(
93 | | FlatBufferMessage::<'a, T>::new(req, payload)
94 | | .map_err(|e| e.into())
95 | | .map(FlatBuffer)
96 | | )
97 | | }
| |_____^
|
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 85:6...
--> src/actix_fbs.rs:85:6
|
85 | impl<'a, T: Follow<'a> + 'a> FromRequest for FlatBuffer<'a, T> {
| ^^
= note: ...so that the types are compatible:
expected actix_fbs::actix_web::FromRequest
found actix_fbs::actix_web::FromRequest
= note: but, the lifetime must be valid for the static lifetime...
= note: ...so that the expression is assignable:
expected std::boxed::Box<(dyn actix_fbs::futures::Future<Error = actix_fbs::FlatBufferPayloadError, Item = actix_fbs::FlatBuffer<'_, T>> + 'static)>
found std::boxed::Box<dyn actix_fbs::futures::Future<Error = actix_fbs::FlatBufferPayloadError, Item = actix_fbs::FlatBuffer<'_, T>>>
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> src/actix_fbs.rs:144:25
|
144 | self.fut = Some(Box::new(fut));
| ^^^^^^^^^^^^^
|
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 116:6...
--> src/actix_fbs.rs:116:6
|
116 | impl<'a, T: Follow<'a> + 'a> Future for FlatBufferMessage<'a, T> {
| ^^
note: ...so that the type `actix_fbs::futures::Map<actix_fbs::futures::Map<actix_fbs::futures::stream::Fold<actix_fbs::futures::stream::FromErr<actix_fbs::actix_web::dev::Payload<std::boxed::Box<dyn actix_fbs::futures::Stream<Item = actix_fbs::bytes::Bytes, Error = actix_fbs::actix_web::error::PayloadError>>>, actix_fbs::FlatBufferPayloadError>, [closure@src/actix_fbs.rs:130:36: 137:14], std::result::Result<actix_fbs::bytes::BytesMut, actix_fbs::FlatBufferPayloadError>, actix_fbs::bytes::BytesMut>, [closure@src/actix_fbs.rs:138:18: 141:14 self:&mut &mut actix_fbs::FlatBufferMessage<'a, T>]>, [closure@src/actix_fbs.rs:142:18: 142:54]>` will meet its required lifetime bounds
--> src/actix_fbs.rs:144:25
|
144 | self.fut = Some(Box::new(fut));
| ^^^^^^^^^^^^^
= note: but, the lifetime must be valid for the static lifetime...
= note: ...so that the expression is assignable:
expected std::option::Option<std::boxed::Box<(dyn actix_fbs::futures::Future<Error = actix_fbs::FlatBufferPayloadError, Item = <T as messages_flatbuffers::flatbuffers::Follow<'a>>::Inner> + 'static)>>
found std::option::Option<std::boxed::Box<dyn actix_fbs::futures::Future<Error = actix_fbs::FlatBufferPayloadError, Item = <T as messages_flatbuffers::flatbuffers::Follow<'a>>::Inner>>>
error: aborting due to 2 previous errors
#[allow(non_snake_case)]
#[path = "../target/flatbuffers/messages_generated.rs"]
pub mod messages_flatbuffers;
mod actix_fbs;
use actix_fbs::FlatBuffer;
use actix_web::{
error, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Result,
};
use bytes::BytesMut;
use futures::{Future, Stream};
const MAX_SIZE: usize = 262_144; // max payload size is 256k
/// This handler manually load request payload and parse json object
fn index_manual(
payload: web::Payload,
) -> impl Future<Item = HttpResponse, Error = Error> {
// payload is a stream of Bytes objects
payload
// `Future::from_err` acts like `?` in that it coerces the error type from
// the future into the final error type
.from_err()
// `fold` will asynchronously read each chunk of the request body and
// call supplied closure, then it resolves to result of closure
.fold(BytesMut::new(), move |mut body, chunk| {
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
Err(error::ErrorBadRequest("overflow"))
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
// `Future::and_then` can be used to merge an asynchronous workflow with a
// synchronous workflow
.and_then(|body| {
let obj = flatbuffers::get_root::<messages_flatbuffers::get_policy::Input>(&body);
println!("obj: {:?}", obj);
println!("obj.id: {:?}", obj.id());
Ok(HttpResponse::Ok().finish()) // <- send response
})
}
fn index<'a>(msg: FlatBuffer<'a, messages_flatbuffers::get_policy::Input<'a>>) -> Result<HttpResponse> {
println!("obj: {:?}", msg);
println!("obj.id: {:?}", msg.id());
Ok(HttpResponse::Ok().finish())
}
fn main() {
std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init();
let sys = actix::System::new("flatbuffers-example");
HttpServer::new(|| {
App::new()
// enable logger
.wrap(middleware::Logger::default())
//.service(web::resource("/").route(web::post().to_async(index_manual)))
.service(web::resource("/").route(web::post().to(index)))
})
.bind("127.0.0.1:8081")
.unwrap()
.shutdown_timeout(1)
.start();
println!("Started http server: 127.0.0.1:8081");
let _ = sys.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment