Skip to content

Instantly share code, notes, and snippets.

@jonhoo
Created July 1, 2019 18:03
Show Gist options
  • Save jonhoo/7be0c372537f66ebe123a3f665a7f12a to your computer and use it in GitHub Desktop.
Save jonhoo/7be0c372537f66ebe123a3f665a7f12a to your computer and use it in GitHub Desktop.
diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml
index 328a0c7..4728c9e 100644
--- a/tower-buffer/Cargo.toml
+++ b/tower-buffer/Cargo.toml
@@ -21,12 +21,17 @@ Buffer requests before dispatching to a `Service`.
categories = ["asynchronous", "network-programming"]
edition = "2018"
+[features]
+log = ["tracing/log"]
+default = []
+
[dependencies]
futures = "0.1.25"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tokio-executor = "0.1.7"
tokio-sync = "0.1.0"
+tracing = "0.1"
[dev-dependencies]
tower = { version = "0.1.0", path = "../tower" }
diff --git a/tower-buffer/src/worker.rs b/tower-buffer/src/worker.rs
index a9955a8..43df821 100644
--- a/tower-buffer/src/worker.rs
+++ b/tower-buffer/src/worker.rs
@@ -22,6 +22,7 @@ where
{
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::Receiver<Message<Request, T::Future>>,
+ span: tracing::Span,
service: T,
finish: bool,
failed: Option<ServiceError>,
@@ -66,15 +67,18 @@ where
inner: Arc::new(Mutex::new(None)),
};
+ let span = tracing::debug_span!("new_worker");
let worker = Worker {
current_message: None,
finish: false,
failed: None,
+ span: span.clone(),
rx,
service,
handle: handle.clone(),
};
+ span.in_scope(|| tracing::debug!("spawning worker"));
match executor.spawn(worker) {
Ok(()) => Some(handle),
Err(_) => None,
@@ -83,32 +87,39 @@ where
/// Return the next queued Message that hasn't been canceled.
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future>>, ()> {
+ let _guard = self.span.enter();
+
if self.finish {
// We've already received None and are shutting down
return Ok(Async::Ready(None));
}
+ tracing::trace!("worker polling for next message");
if let Some(mut msg) = self.current_message.take() {
// poll_cancel returns Async::Ready is the receiver is dropped.
// Returning NotReady means it is still alive, so we should still
// use it.
if msg.tx.poll_close()?.is_not_ready() {
+ tracing::trace!("re-processing buffered request");
return Ok(Async::Ready(Some(msg)));
}
+ tracing::trace!("dropping cancelled buffered request");
}
// Get the next request
while let Some(mut msg) = try_ready!(self.rx.poll().map_err(|_| ())) {
if msg.tx.poll_close()?.is_not_ready() {
+ tracing::trace!("re-processing buffered request");
return Ok(Async::Ready(Some(msg)));
}
// Otherwise, request is canceled, so pop the next one.
+ tracing::trace!("dropping cancelled request");
}
Ok(Async::Ready(None))
}
- fn failed(&mut self, error: T::Error) {
+ fn failed(&mut self, error: Error) {
// The underlying service failed when we called `poll_ready` on it with the given `error`. We
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
@@ -121,7 +132,7 @@ where
// request. We do this by *first* exposing the error, *then* closing the channel used to
// send more requests (so the client will see the error when the send fails), and *then*
// sending the error to all outstanding requests.
- let error = ServiceError::new(error.into());
+ let error = ServiceError::new(error);
let mut inner = self.handle.inner.lock().unwrap();
@@ -158,20 +169,25 @@ where
loop {
match try_ready!(self.poll_next_msg()) {
Some(msg) => {
+ let _guard = self.span.enter();
if let Some(ref failed) = self.failed {
+ tracing::trace!("notifying about worker failure");
let _ = msg.tx.send(Err(failed.clone()));
continue;
}
// Wait for the service to be ready
+ tracing::trace!("waiting for service readiness");
match self.service.poll_ready() {
Ok(Async::Ready(())) => {
+ tracing::debug!("processing request");
let response = self.service.call(msg.request);
// Send the response future back to the sender.
//
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
+ tracing::trace!("returning response future");
let _ = msg.tx.send(Ok(response));
}
Ok(Async::NotReady) => {
@@ -180,7 +196,10 @@ where
return Ok(Async::NotReady);
}
Err(e) => {
- self.failed(e);
+ let error = e.into();
+ tracing::trace!({ ?error }, "service failed");
+ drop(_guard);
+ self.failed(error);
let _ = msg.tx.send(Err(self
.failed
.as_ref()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment