Skip to content

Instantly share code, notes, and snippets.

@jodh-intel
Created June 29, 2020 08:52
Show Gist options
  • Save jodh-intel/84e82a6961421b1abdf46672147c3112 to your computer and use it in GitHub Desktop.
Save jodh-intel/84e82a6961421b1abdf46672147c3112 to your computer and use it in GitHub Desktop.
diff --git a/src/server.rs b/src/server.rs
index e307527..5871fd4 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -46,6 +46,7 @@ pub struct Server {
quit: Arc<AtomicBool>,
connections: Arc<Mutex<HashMap<RawFd, Connection>>>,
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
+ pre_handler: Option<Arc<Box<dyn PreHandler + Send + Sync>>>,
handler: Option<JoinHandle<()>>,
thread_count_default: usize,
thread_count_min: usize,
@@ -72,6 +73,7 @@ struct ThreadS<'a> {
wtc: &'a Arc<AtomicUsize>,
quit: &'a Arc<AtomicBool>,
methods: &'a Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
+ pre_handler: &'a Option<&'a Arc<Box<dyn PreHandler + Send + Sync>>>,
res_tx: &'a Sender<(MessageHeader, Vec<u8>)>,
control_tx: &'a SyncSender<()>,
default: usize,
@@ -85,6 +87,7 @@ fn start_method_handler_thread(
wtc: Arc<AtomicUsize>,
quit: Arc<AtomicBool>,
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
+ pre_handler: Option<Arc<Box<dyn PreHandler + Send + Sync>>>,
res_tx: Sender<(MessageHeader, Vec<u8>)>,
control_tx: SyncSender<()>,
min: usize,
@@ -202,6 +205,15 @@ fn start_method_handler_thread(
mh,
res_tx: res_tx.clone(),
};
+
+ if pre_handler.is_some() {
+ let hr = pre_handler.as_deref();
+ let arc_ref = hr.unwrap().clone();
+ let result = arc_ref.handler();
+
+ info!("pre handler result: {:?}", result);
+ }
+
if let Err(x) = method.handler(ctx, req) {
debug!("method handle {} get error {:?}", path, x);
quit.store(true, Ordering::SeqCst);
@@ -217,17 +229,23 @@ fn start_method_handler_thread(
});
}
-fn start_method_handler_threads(num: usize, ts: &ThreadS) {
+fn start_method_handler_threads(num: usize, ts: &mut ThreadS) {
for _ in 0..num {
if ts.quit.load(Ordering::SeqCst) {
break;
}
+
+ let hr = ts.pre_handler.take().as_ref();
+ let arc_ref = hr.unwrap().clone();
+ let pre_handler = Some(*arc_ref);
+
start_method_handler_thread(
ts.fd,
ts.fdlock.clone(),
ts.wtc.clone(),
ts.quit.clone(),
ts.methods.clone(),
+ pre_handler.clone(),
ts.res_tx.clone(),
ts.control_tx.clone(),
ts.min,
@@ -236,10 +254,10 @@ fn start_method_handler_threads(num: usize, ts: &ThreadS) {
}
}
-fn check_method_handler_threads(ts: &ThreadS) {
+fn check_method_handler_threads(ts: &mut ThreadS) {
let c = ts.wtc.load(Ordering::SeqCst);
if c < ts.min {
- start_method_handler_threads(ts.default - c, &ts);
+ start_method_handler_threads(ts.default - c, &mut ts);
}
}
@@ -252,6 +270,7 @@ impl Default for Server {
quit: Arc::new(AtomicBool::new(false)),
connections: Arc::new(Mutex::new(HashMap::new())),
methods: Arc::new(HashMap::new()),
+ pre_handler: None,
handler: None,
thread_count_default: DEFAULT_WAIT_THREAD_COUNT_DEFAULT,
thread_count_min: DEFAULT_WAIT_THREAD_COUNT_MIN,
@@ -262,10 +281,12 @@ impl Default for Server {
impl Server {
pub fn new() -> Server {
+ eprintln!("FIXME: ttrpc: new:");
Server::default()
}
pub fn bind(mut self, host: &str) -> Result<Server> {
+ eprintln!("FIXME: ttrpc: bind:");
if !self.listeners.is_empty() {
return Err(Error::Others(
"ttrpc-rust just support 1 host now".to_string(),
@@ -322,6 +343,8 @@ impl Server {
bind(fd, &sockaddr).map_err(err_to_Others!(e, ""))?;
self.listeners.push(fd);
+ eprintln!("FIXME: ttrpc: bind: DONE");
+
Ok(self)
}
@@ -335,8 +358,18 @@ impl Server {
mut self,
methods: HashMap<String, Box<dyn MethodHandler + Send + Sync>>,
) -> Server {
+ eprintln!("FIXME: ttrpc: register_service:");
+
let mut_methods = Arc::get_mut(&mut self.methods).unwrap();
mut_methods.extend(methods);
+
+ eprintln!("FIXME: ttrpc: register_service: DONE");
+ self
+ }
+
+ pub fn register_pre_handler(mut self, f: Arc<Box<dyn PreHandler + Send + Sync>>) -> Server {
+ self.pre_handler = Some(f);
+
self
}
@@ -356,6 +389,8 @@ impl Server {
}
pub fn start(&mut self) -> Result<()> {
+ eprintln!("FIXME: ttrpc: start:");
+
if self.thread_count_default >= self.thread_count_max {
return Err(Error::Others(
"thread_count_default should smaller than thread_count_max".to_string(),
@@ -376,6 +411,8 @@ impl Server {
let listener = self.listeners[0];
let methods = self.methods.clone();
+ // FIXME:
+ let pre_handler = self.pre_handler.clone();
let default = self.thread_count_default;
let min = self.thread_count_min;
let max = self.thread_count_max;
@@ -454,6 +491,11 @@ impl Server {
};
let methods = methods.clone();
+
+ let hr = self.pre_handler.take().as_ref();
+ let arc_ref = hr.unwrap().clone();
+ let pre_handler = Some(&arc_ref);
+
let quit = Arc::new(AtomicBool::new(false));
let child_quit = quit.clone();
let reaper_tx_child = reaper_tx.clone();
@@ -488,6 +530,7 @@ impl Server {
fdlock: &Arc::new(Mutex::new(())),
wtc: &Arc::new(AtomicUsize::new(0)),
methods: &methods,
+ pre_handler: &pre_handler,
res_tx: &res_tx,
control_tx: &control_tx,
quit: &child_quit,
@@ -495,10 +538,10 @@ impl Server {
min,
max,
};
- start_method_handler_threads(ts.default, &ts);
+ start_method_handler_threads(ts.default, &mut ts);
while !child_quit.load(Ordering::SeqCst) {
- check_method_handler_threads(&ts);
+ check_method_handler_threads(&mut ts);
if control_rx.recv().is_err() {
break;
}
@@ -534,31 +577,59 @@ impl Server {
self.handler = Some(handler);
+ eprintln!("FIXME: ttrpc: start: DONE");
+
Ok(())
}
pub fn shutdown(mut self) {
+ eprintln!("FIXME: ttrpc: shutdown:");
+
+ eprintln!("FIXME: ttrpc: shutdown: getting connections");
let connections = self.connections.lock().unwrap();
+ //let connections_ref = self.connections.clone();
+ //let connections = connections_ref.lock().unwrap();
+
+ eprintln!("FIXME: ttrpc: shutdown: got connections");
+
+ eprintln!("FIXME: ttrpc: shutdown: storing");
self.quit.store(true, Ordering::SeqCst);
+ eprintln!("FIXME: ttrpc: shutdown: stored");
+
+ eprintln!("FIXME: ttrpc: shutdown: closing");
close(self.monitor_fd.1).unwrap_or_else(|e| {
warn!(
"failed to close notify fd: {} with error: {}",
self.monitor_fd.1, e
)
});
+ eprintln!("FIXME: ttrpc: shutdown: closed");
+ eprintln!("FIXME: ttrpc: shutdown: closing connections");
for (_fd, c) in connections.iter() {
+ eprintln!("FIXME: ttrpc: shutdown: closing connection");
c.close();
+ eprintln!("FIXME: ttrpc: shutdown: closed connection");
}
+ eprintln!("FIXME: ttrpc: shutdown: closed connections");
// release connections's lock, since the following handler.join()
// would wait on the other thread's exit in which would take the lock.
+ eprintln!("FIXME: ttrpc: shutdown: dropping connections");
drop(connections);
+ eprintln!("FIXME: ttrpc: shutdown: dropped connections");
+ eprintln!("FIXME: ttrpc: shutdown: handling");
if let Some(handler) = self.handler.take() {
- handler.join().unwrap();
+ eprintln!("FIXME: ttrpc: shutdown: FIXME: *NOT* joining handle");
+ //eprintln!("FIXME: ttrpc: shutdown: joining handle");
+ //handler.join().unwrap();
+ //eprintln!("FIXME: ttrpc: shutdown: joined handle");
}
+ eprintln!("FIXME: ttrpc: shutdown: handled");
+
+ eprintln!("FIXME: ttrpc: shutdown: DONE");
}
}
@@ -573,6 +644,10 @@ pub trait MethodHandler {
fn handler(&self, ctx: TtrpcContext, req: Request) -> Result<()>;
}
+pub trait PreHandler {
+ fn handler(&self) -> Result<()>;
+}
+
pub fn response_to_channel(
stream_id: u32,
res: Response,
@jodh-intel
Copy link
Author

Hi @lifupan - I'm trying to add a pre handler to ttrpc that is registered then called automatically before each API call. However, I'm having problems with the lifetime checker 😄 I've tried changing the pre_handler to:

pre_handler: Option<&'a Arc<Mutex<Box<dyn PreHandler + Send + Sync>>>>

... but that doesn't work either. Any ideas?

@jodh-intel
Copy link
Author

@lifupan - I think I've found a solution using https://docs.rs/crossbeam-utils/0.7.2/crossbeam_utils/thread/fn.scope.html - would that be something you'd consider adding to ttrpc? I'll raise a proof-of-concept PR tomorrow hopefully...

@jodh-intel
Copy link
Author

Nooo...! Although I've got a basic test program working fine using most of the logic above, I can't get this to work for tttrpc due to needing to save the threads join handle in Server.handle. That's easy with a standard thread, but crossbeam scoped threads need to be specified like this:

handler: Option<cb_thread::ScopedJoinHandle<'a, ()>>, 

... But that just takes me back to the original problem of how to avoid changing all the lifetimes to 'static 😢

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment