Created
September 19, 2020 16:51
-
-
Save autodidaddict/1e1ba97c2eb1cf772fae9df5e712c809 to your computer and use it in GitHub Desktop.
Experiment exploring using a middleware processor actor to invoke a bunch of middleware actors as well as the ultimate call target
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::types::{Invocation, InvocationResponse}; | |
use crate::Result; | |
use actix::prelude::*; | |
/// The trait that must be implemented by all Wasmcloud middleware | |
pub trait Middleware: Send + Sync { | |
fn pre_invoke(&self, inv: Invocation) -> Result<Invocation>; | |
fn post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse>; | |
} | |
pub struct MiddlewareProcessor { | |
mids: Vec<Box<dyn Middleware>>, | |
} | |
impl MiddlewareProcessor { | |
pub fn new() -> MiddlewareProcessor { | |
MiddlewareProcessor { mids: vec![] } | |
} | |
} | |
#[derive(Message)] | |
#[rtype(result = "InvocationResponse")] | |
pub struct ExecuteChain { | |
inv: Invocation, | |
target: Recipient<Invocation>, | |
} | |
#[derive(Message)] | |
#[rtype(result = "Result<()>")] | |
pub struct AddMiddleware { | |
mid: Box<dyn Middleware>, | |
} | |
impl ExecuteChain { | |
pub fn new(inv: Invocation, target: Recipient<Invocation>) -> ExecuteChain { | |
ExecuteChain { inv, target } | |
} | |
} | |
impl Actor for MiddlewareProcessor { | |
type Context = Context<Self>; | |
} | |
impl Handler<ExecuteChain> for MiddlewareProcessor { | |
type Result = ResponseActFuture<Self, InvocationResponse>; | |
fn handle(&mut self, msg: ExecuteChain, ctx: &mut Self::Context) -> Self::Result { | |
let mut cur_inv = msg.inv.clone(); | |
for m in self.mids.iter() { | |
match m.pre_invoke(cur_inv.clone()) { | |
Ok(i) => cur_inv = i.clone(), | |
Err(e) => { | |
return Box::pin( | |
async move { InvocationResponse::error(&cur_inv, &format!("{}", e)) } | |
.into_actor(self), | |
); | |
} | |
} | |
} | |
let c = cur_inv.clone(); | |
Box::pin( | |
msg.target | |
.send(cur_inv.clone()) | |
.into_actor(self) | |
.map(move |res, act, _ctx| { | |
let res = res.unwrap(); | |
if res.error.is_some() { | |
return res; | |
} | |
let mut cur_resp = res; | |
for m in act.mids.iter() { | |
match m.post_invoke(cur_resp) { | |
Ok(i) => cur_resp = i.clone(), | |
Err(e) => return InvocationResponse::error(&c, &format!("{}", e)), | |
} | |
} | |
cur_resp | |
}), | |
) | |
} | |
} | |
impl Handler<AddMiddleware> for MiddlewareProcessor { | |
type Result = Result<()>; | |
fn handle(&mut self, msg: AddMiddleware, ctx: &mut Self::Context) -> Self::Result { | |
self.mids.push(msg.mid); | |
Ok(()) | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use crate::middleware::{ExecuteChain, Middleware, MiddlewareProcessor, AddMiddleware}; | |
use crate::types::{Invocation, InvocationResponse, WasmcloudEntity}; | |
use crate::Result; | |
use actix::prelude::*; | |
use std::sync::atomic::{AtomicU32, Ordering}; | |
use wascap::prelude::KeyPair; | |
use std::sync::Arc; | |
#[actix_rt::test] | |
async fn middleware_invokes_in_chain() { | |
let pre = Arc::new(AtomicU32::new(0)); | |
let post = Arc::new(AtomicU32::new(0)); | |
let mid = Box::new(AddingMiddleware::new(pre.clone(), post.clone())); | |
let midproc = MiddlewareProcessor::new().start(); | |
let unit = UnitActor::new().start(); | |
let _ = midproc.send(AddMiddleware { | |
mid: mid.clone(), | |
}).await.unwrap(); | |
let _ = midproc.send(AddMiddleware { | |
mid: mid.clone(), | |
}).await.unwrap(); | |
let inv = Invocation::new( | |
&KeyPair::new_server(), | |
WasmcloudEntity::Capability { | |
capid: "wasmcloud:test".to_string(), | |
binding: "foo".to_string(), | |
}, | |
WasmcloudEntity::Actor("MX".to_string()), | |
"OP_FOO", | |
b"MESSAGE".to_vec(), | |
); | |
let res = midproc | |
.send(ExecuteChain { | |
inv, | |
target: unit.recipient(), | |
}) | |
.await | |
.unwrap(); | |
assert_eq!(true, res.error.is_none()); | |
assert_eq!(2, pre.fetch_or(99, Ordering::SeqCst)); | |
assert_eq!(2, post.fetch_or(99, Ordering::SeqCst)); | |
} | |
#[derive(Clone)] | |
struct AddingMiddleware { | |
pre_count: Arc<AtomicU32>, | |
post_count: Arc<AtomicU32>, | |
} | |
impl AddingMiddleware { | |
fn new(pre: Arc<AtomicU32>, post: Arc<AtomicU32>) -> AddingMiddleware { | |
AddingMiddleware { | |
pre_count: pre, | |
post_count: post, | |
} | |
} | |
} | |
impl Middleware for AddingMiddleware { | |
fn pre_invoke(&self, inv: Invocation) -> Result<Invocation> { | |
let _ = self.pre_count.fetch_add(1, Ordering::SeqCst); | |
Ok(inv) | |
} | |
fn post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse> { | |
let _ = self.post_count.fetch_add(1, Ordering::SeqCst); | |
Ok(response) | |
} | |
} | |
struct UnitActor {} | |
impl UnitActor { | |
fn new() -> UnitActor { | |
UnitActor {} | |
} | |
} | |
impl Actor for UnitActor { | |
type Context = Context<Self>; | |
} | |
impl Handler<Invocation> for UnitActor { | |
type Result = InvocationResponse; | |
fn handle(&mut self, msg: Invocation, ctx: &mut Self::Context) -> Self::Result { | |
InvocationResponse::success(&msg, b"hello".to_vec()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment