Created
July 1, 2019 18:03
-
-
Save jonhoo/7be0c372537f66ebe123a3f665a7f12a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml | |
index 328a0c7..4728c9e 100644 | |
--- a/tower-buffer/Cargo.toml | |
+++ b/tower-buffer/Cargo.toml | |
@@ -21,12 +21,17 @@ Buffer requests before dispatching to a `Service`. | |
categories = ["asynchronous", "network-programming"] | |
edition = "2018" | |
+[features] | |
+log = ["tracing/log"] | |
+default = [] | |
+ | |
[dependencies] | |
futures = "0.1.25" | |
tower-service = "0.2.0" | |
tower-layer = "0.1.0" | |
tokio-executor = "0.1.7" | |
tokio-sync = "0.1.0" | |
+tracing = "0.1" | |
[dev-dependencies] | |
tower = { version = "0.1.0", path = "../tower" } | |
diff --git a/tower-buffer/src/worker.rs b/tower-buffer/src/worker.rs | |
index a9955a8..43df821 100644 | |
--- a/tower-buffer/src/worker.rs | |
+++ b/tower-buffer/src/worker.rs | |
@@ -22,6 +22,7 @@ where | |
{ | |
current_message: Option<Message<Request, T::Future>>, | |
rx: mpsc::Receiver<Message<Request, T::Future>>, | |
+ span: tracing::Span, | |
service: T, | |
finish: bool, | |
failed: Option<ServiceError>, | |
@@ -66,15 +67,18 @@ where | |
inner: Arc::new(Mutex::new(None)), | |
}; | |
+ let span = tracing::debug_span!("new_worker"); | |
let worker = Worker { | |
current_message: None, | |
finish: false, | |
failed: None, | |
+ span: span.clone(), | |
rx, | |
service, | |
handle: handle.clone(), | |
}; | |
+ span.in_scope(|| tracing::debug!("spawning worker")); | |
match executor.spawn(worker) { | |
Ok(()) => Some(handle), | |
Err(_) => None, | |
@@ -83,32 +87,39 @@ where | |
/// Return the next queued Message that hasn't been canceled. | |
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future>>, ()> { | |
+ let _guard = self.span.enter(); | |
+ | |
if self.finish { | |
// We've already received None and are shutting down | |
return Ok(Async::Ready(None)); | |
} | |
+ tracing::trace!("worker polling for next message"); | |
if let Some(mut msg) = self.current_message.take() { | |
// poll_cancel returns Async::Ready is the receiver is dropped. | |
// Returning NotReady means it is still alive, so we should still | |
// use it. | |
if msg.tx.poll_close()?.is_not_ready() { | |
+ tracing::trace!("re-processing buffered request"); | |
return Ok(Async::Ready(Some(msg))); | |
} | |
+ tracing::trace!("dropping cancelled buffered request"); | |
} | |
// Get the next request | |
while let Some(mut msg) = try_ready!(self.rx.poll().map_err(|_| ())) { | |
if msg.tx.poll_close()?.is_not_ready() { | |
+ tracing::trace!("re-processing buffered request"); | |
return Ok(Async::Ready(Some(msg))); | |
} | |
// Otherwise, request is canceled, so pop the next one. | |
+ tracing::trace!("dropping cancelled request"); | |
} | |
Ok(Async::Ready(None)) | |
} | |
- fn failed(&mut self, error: T::Error) { | |
+ fn failed(&mut self, error: Error) { | |
// The underlying service failed when we called `poll_ready` on it with the given `error`. We | |
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in | |
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent | |
@@ -121,7 +132,7 @@ where | |
// request. We do this by *first* exposing the error, *then* closing the channel used to | |
// send more requests (so the client will see the error when the send fails), and *then* | |
// sending the error to all outstanding requests. | |
- let error = ServiceError::new(error.into()); | |
+ let error = ServiceError::new(error); | |
let mut inner = self.handle.inner.lock().unwrap(); | |
@@ -158,20 +169,25 @@ where | |
loop { | |
match try_ready!(self.poll_next_msg()) { | |
Some(msg) => { | |
+ let _guard = self.span.enter(); | |
if let Some(ref failed) = self.failed { | |
+ tracing::trace!("notifying about worker failure"); | |
let _ = msg.tx.send(Err(failed.clone())); | |
continue; | |
} | |
// Wait for the service to be ready | |
+ tracing::trace!("waiting for service readiness"); | |
match self.service.poll_ready() { | |
Ok(Async::Ready(())) => { | |
+ tracing::debug!("processing request"); | |
let response = self.service.call(msg.request); | |
// Send the response future back to the sender. | |
// | |
// An error means the request had been canceled in-between | |
// our calls, the response future will just be dropped. | |
+ tracing::trace!("returning response future"); | |
let _ = msg.tx.send(Ok(response)); | |
} | |
Ok(Async::NotReady) => { | |
@@ -180,7 +196,10 @@ where | |
return Ok(Async::NotReady); | |
} | |
Err(e) => { | |
- self.failed(e); | |
+ let error = e.into(); | |
+ tracing::trace!({ ?error }, "service failed"); | |
+ drop(_guard); | |
+ self.failed(error); | |
let _ = msg.tx.send(Err(self | |
.failed | |
.as_ref() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment