Created
June 29, 2020 08:52
-
-
Save jodh-intel/84e82a6961421b1abdf46672147c3112 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/server.rs b/src/server.rs | |
index e307527..5871fd4 100644 | |
--- a/src/server.rs | |
+++ b/src/server.rs | |
@@ -46,6 +46,7 @@ pub struct Server { | |
quit: Arc<AtomicBool>, | |
connections: Arc<Mutex<HashMap<RawFd, Connection>>>, | |
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>, | |
+ pre_handler: Option<Arc<Box<dyn PreHandler + Send + Sync>>>, | |
handler: Option<JoinHandle<()>>, | |
thread_count_default: usize, | |
thread_count_min: usize, | |
@@ -72,6 +73,7 @@ struct ThreadS<'a> { | |
wtc: &'a Arc<AtomicUsize>, | |
quit: &'a Arc<AtomicBool>, | |
methods: &'a Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>, | |
+ pre_handler: &'a Option<&'a Arc<Box<dyn PreHandler + Send + Sync>>>, | |
res_tx: &'a Sender<(MessageHeader, Vec<u8>)>, | |
control_tx: &'a SyncSender<()>, | |
default: usize, | |
@@ -85,6 +87,7 @@ fn start_method_handler_thread( | |
wtc: Arc<AtomicUsize>, | |
quit: Arc<AtomicBool>, | |
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>, | |
+ pre_handler: Option<Arc<Box<dyn PreHandler + Send + Sync>>>, | |
res_tx: Sender<(MessageHeader, Vec<u8>)>, | |
control_tx: SyncSender<()>, | |
min: usize, | |
@@ -202,6 +205,15 @@ fn start_method_handler_thread( | |
mh, | |
res_tx: res_tx.clone(), | |
}; | |
+ | |
+ if pre_handler.is_some() { | |
+ let hr = pre_handler.as_deref(); | |
+ let arc_ref = hr.unwrap().clone(); | |
+ let result = arc_ref.handler(); | |
+ | |
+ info!("pre handler result: {:?}", result); | |
+ } | |
+ | |
if let Err(x) = method.handler(ctx, req) { | |
debug!("method handle {} get error {:?}", path, x); | |
quit.store(true, Ordering::SeqCst); | |
@@ -217,17 +229,23 @@ fn start_method_handler_thread( | |
}); | |
} | |
-fn start_method_handler_threads(num: usize, ts: &ThreadS) { | |
+fn start_method_handler_threads(num: usize, ts: &mut ThreadS) { | |
for _ in 0..num { | |
if ts.quit.load(Ordering::SeqCst) { | |
break; | |
} | |
+ | |
+ let hr = ts.pre_handler.take().as_ref(); | |
+ let arc_ref = hr.unwrap().clone(); | |
+ let pre_handler = Some(*arc_ref); | |
+ | |
start_method_handler_thread( | |
ts.fd, | |
ts.fdlock.clone(), | |
ts.wtc.clone(), | |
ts.quit.clone(), | |
ts.methods.clone(), | |
+ pre_handler.clone(), | |
ts.res_tx.clone(), | |
ts.control_tx.clone(), | |
ts.min, | |
@@ -236,10 +254,10 @@ fn start_method_handler_threads(num: usize, ts: &ThreadS) { | |
} | |
} | |
-fn check_method_handler_threads(ts: &ThreadS) { | |
+fn check_method_handler_threads(ts: &mut ThreadS) { | |
let c = ts.wtc.load(Ordering::SeqCst); | |
if c < ts.min { | |
- start_method_handler_threads(ts.default - c, &ts); | |
+ start_method_handler_threads(ts.default - c, &mut ts); | |
} | |
} | |
@@ -252,6 +270,7 @@ impl Default for Server { | |
quit: Arc::new(AtomicBool::new(false)), | |
connections: Arc::new(Mutex::new(HashMap::new())), | |
methods: Arc::new(HashMap::new()), | |
+ pre_handler: None, | |
handler: None, | |
thread_count_default: DEFAULT_WAIT_THREAD_COUNT_DEFAULT, | |
thread_count_min: DEFAULT_WAIT_THREAD_COUNT_MIN, | |
@@ -262,10 +281,12 @@ impl Default for Server { | |
impl Server { | |
pub fn new() -> Server { | |
+ eprintln!("FIXME: ttrpc: new:"); | |
Server::default() | |
} | |
pub fn bind(mut self, host: &str) -> Result<Server> { | |
+ eprintln!("FIXME: ttrpc: bind:"); | |
if !self.listeners.is_empty() { | |
return Err(Error::Others( | |
"ttrpc-rust just support 1 host now".to_string(), | |
@@ -322,6 +343,8 @@ impl Server { | |
bind(fd, &sockaddr).map_err(err_to_Others!(e, ""))?; | |
self.listeners.push(fd); | |
+ eprintln!("FIXME: ttrpc: bind: DONE"); | |
+ | |
Ok(self) | |
} | |
@@ -335,8 +358,18 @@ impl Server { | |
mut self, | |
methods: HashMap<String, Box<dyn MethodHandler + Send + Sync>>, | |
) -> Server { | |
+ eprintln!("FIXME: ttrpc: register_service:"); | |
+ | |
let mut_methods = Arc::get_mut(&mut self.methods).unwrap(); | |
mut_methods.extend(methods); | |
+ | |
+ eprintln!("FIXME: ttrpc: register_service: DONE"); | |
+ self | |
+ } | |
+ | |
+ pub fn register_pre_handler(mut self, f: Arc<Box<dyn PreHandler + Send + Sync>>) -> Server { | |
+ self.pre_handler = Some(f); | |
+ | |
self | |
} | |
@@ -356,6 +389,8 @@ impl Server { | |
} | |
pub fn start(&mut self) -> Result<()> { | |
+ eprintln!("FIXME: ttrpc: start:"); | |
+ | |
if self.thread_count_default >= self.thread_count_max { | |
return Err(Error::Others( | |
"thread_count_default should smaller than thread_count_max".to_string(), | |
@@ -376,6 +411,8 @@ impl Server { | |
let listener = self.listeners[0]; | |
let methods = self.methods.clone(); | |
+ // FIXME: | |
+ let pre_handler = self.pre_handler.clone(); | |
let default = self.thread_count_default; | |
let min = self.thread_count_min; | |
let max = self.thread_count_max; | |
@@ -454,6 +491,11 @@ impl Server { | |
}; | |
let methods = methods.clone(); | |
+ | |
+ let hr = self.pre_handler.take().as_ref(); | |
+ let arc_ref = hr.unwrap().clone(); | |
+ let pre_handler = Some(&arc_ref); | |
+ | |
let quit = Arc::new(AtomicBool::new(false)); | |
let child_quit = quit.clone(); | |
let reaper_tx_child = reaper_tx.clone(); | |
@@ -488,6 +530,7 @@ impl Server { | |
fdlock: &Arc::new(Mutex::new(())), | |
wtc: &Arc::new(AtomicUsize::new(0)), | |
methods: &methods, | |
+ pre_handler: &pre_handler, | |
res_tx: &res_tx, | |
control_tx: &control_tx, | |
quit: &child_quit, | |
@@ -495,10 +538,10 @@ impl Server { | |
min, | |
max, | |
}; | |
- start_method_handler_threads(ts.default, &ts); | |
+ start_method_handler_threads(ts.default, &mut ts); | |
while !child_quit.load(Ordering::SeqCst) { | |
- check_method_handler_threads(&ts); | |
+ check_method_handler_threads(&mut ts); | |
if control_rx.recv().is_err() { | |
break; | |
} | |
@@ -534,31 +577,59 @@ impl Server { | |
self.handler = Some(handler); | |
+ eprintln!("FIXME: ttrpc: start: DONE"); | |
+ | |
Ok(()) | |
} | |
pub fn shutdown(mut self) { | |
+ eprintln!("FIXME: ttrpc: shutdown:"); | |
+ | |
+ eprintln!("FIXME: ttrpc: shutdown: getting connections"); | |
let connections = self.connections.lock().unwrap(); | |
+ //let connections_ref = self.connections.clone(); | |
+ //let connections = connections_ref.lock().unwrap(); | |
+ | |
+ eprintln!("FIXME: ttrpc: shutdown: got connections"); | |
+ | |
+ eprintln!("FIXME: ttrpc: shutdown: storing"); | |
self.quit.store(true, Ordering::SeqCst); | |
+ eprintln!("FIXME: ttrpc: shutdown: stored"); | |
+ | |
+ eprintln!("FIXME: ttrpc: shutdown: closing"); | |
close(self.monitor_fd.1).unwrap_or_else(|e| { | |
warn!( | |
"failed to close notify fd: {} with error: {}", | |
self.monitor_fd.1, e | |
) | |
}); | |
+ eprintln!("FIXME: ttrpc: shutdown: closed"); | |
+ eprintln!("FIXME: ttrpc: shutdown: closing connections"); | |
for (_fd, c) in connections.iter() { | |
+ eprintln!("FIXME: ttrpc: shutdown: closing connection"); | |
c.close(); | |
+ eprintln!("FIXME: ttrpc: shutdown: closed connection"); | |
} | |
+ eprintln!("FIXME: ttrpc: shutdown: closed connections"); | |
// release connections's lock, since the following handler.join() | |
// would wait on the other thread's exit in which would take the lock. | |
+ eprintln!("FIXME: ttrpc: shutdown: dropping connections"); | |
drop(connections); | |
+ eprintln!("FIXME: ttrpc: shutdown: dropped connections"); | |
+ eprintln!("FIXME: ttrpc: shutdown: handling"); | |
if let Some(handler) = self.handler.take() { | |
- handler.join().unwrap(); | |
+ eprintln!("FIXME: ttrpc: shutdown: FIXME: *NOT* joining handle"); | |
+ //eprintln!("FIXME: ttrpc: shutdown: joining handle"); | |
+ //handler.join().unwrap(); | |
+ //eprintln!("FIXME: ttrpc: shutdown: joined handle"); | |
} | |
+ eprintln!("FIXME: ttrpc: shutdown: handled"); | |
+ | |
+ eprintln!("FIXME: ttrpc: shutdown: DONE"); | |
} | |
} | |
@@ -573,6 +644,10 @@ pub trait MethodHandler { | |
fn handler(&self, ctx: TtrpcContext, req: Request) -> Result<()>; | |
} | |
+pub trait PreHandler { | |
+ fn handler(&self) -> Result<()>; | |
+} | |
+ | |
pub fn response_to_channel( | |
stream_id: u32, | |
res: Response, |
Nooo...! Although I've got a basic test program working fine using most of the logic above, I can't get this to work for tttrpc due to needing to save the threads join handle in Server.handle
. That's easy with a standard thread, but crossbeam scoped threads need to be specified like this:
handler: Option<cb_thread::ScopedJoinHandle<'a, ()>>,
... But that just takes me back to the original problem of how to avoid changing all the lifetimes to 'static
😢
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@lifupan - I think I've found a solution using https://docs.rs/crossbeam-utils/0.7.2/crossbeam_utils/thread/fn.scope.html - would that be something you'd consider adding to ttrpc? I'll raise a proof-of-concept PR tomorrow hopefully...