Created
April 17, 2026 13:55
-
-
Save chrisleavoy/89ba4d0dfc8625ade860bd6753344b1d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/.cargo/config.toml b/.cargo/config.toml | |
| index 2d2f4cb..423a573 100644 | |
| --- a/.cargo/config.toml | |
| +++ b/.cargo/config.toml | |
| @@ -8,6 +8,7 @@ JEMALLOC_SYS_WITH_LG_PAGE = "16" | |
| [target.'cfg(all())'] | |
| rustflags = [ | |
| + "--cfg", "tokio_unstable", | |
| "-Dclippy::print_stdout", | |
| "-Dclippy::print_stderr", | |
| "-Dclippy::dbg_macro", | |
| @@ -28,7 +29,7 @@ rustflags = "-Lnative=/lib/native-libs" | |
| rustflags = "-Lnative=/lib/native-libs" | |
| [target.x86_64-unknown-linux-gnu] | |
| -rustflags = ["-C", "link-args=-rdynamic"] | |
| +rustflags = ["--cfg", "tokio_unstable", "-C", "link-args=-rdynamic"] | |
| [target.aarch64-unknown-linux-gnu] | |
| rustflags = ["-C", "link-args=-rdynamic"] | |
| diff --git a/Cargo.toml b/Cargo.toml | |
| index abcf19f..9024b9d 100644 | |
| --- a/Cargo.toml | |
| +++ b/Cargo.toml | |
| @@ -139,6 +139,8 @@ members = [ | |
| [workspace.dependencies] | |
| anyhow = { version = "1.0.99", default-features = false, features = ["std"] } | |
| arc-swap = { version = "1.7.1", default-features = false } | |
| +backtrace = { version = "0.3" } | |
| +async-backtrace = { version = "0.2", default-features = false } | |
| async-stream = { version = "0.3.6", default-features = false } | |
| async-trait = { version = "0.1.89", default-features = false } | |
| base64 = { version = "0.22.1", default-features = false } | |
| @@ -237,6 +239,9 @@ vector-vrl-functions = { workspace = true, features = ["vrl-metrics"] } | |
| vector-vrl-metrics = { path = "lib/vector-vrl-metrics" } | |
| loki-logproto = { path = "lib/loki-logproto", optional = true } | |
| +async-backtrace.workspace = true | |
| +backtrace.workspace = true | |
| + | |
| # Tokio / Futures | |
| async-stream.workspace = true | |
| async-trait.workspace = true | |
| diff --git a/Dockerfile.debug b/Dockerfile.debug | |
| new file mode 100644 | |
| index 0000000..8e2934a | |
| --- /dev/null | |
| +++ b/Dockerfile.debug | |
| @@ -0,0 +1,62 @@ | |
| +# Dockerfile.debug - Builds Vector 0.53.0 with SIGTERM debug dump patch | |
| +# Target: linux/amd64, distroless-libc compatible (mirrors timberio/vector:0.53.0-distroless-libc) | |
| +# Enables --cfg tokio_unstable for Tokio runtime metrics in the SIGTERM dump. | |
| + | |
| +# ─── Stage 1: Build ────────────────────────────────────────────────────────── | |
| +FROM rust:1.92-bookworm AS builder | |
| + | |
| +WORKDIR /vector | |
| + | |
| +# Install system build dependencies (mirrors what cross/CI uses for x86_64-unknown-linux-gnu) | |
| +RUN apt-get update && apt-get install -y --no-install-recommends \ | |
| + build-essential \ | |
| + cmake \ | |
| + libssl-dev \ | |
| + libsasl2-dev \ | |
| + pkg-config \ | |
| + python3 \ | |
| + protobuf-compiler \ | |
| + clang \ | |
| + libclang-dev \ | |
| + llvm-dev \ | |
| + && rm -rf /var/lib/apt/lists/* | |
| + | |
| +# Copy source | |
| +COPY . . | |
| + | |
| +# Build Vector with the feature set Etsy needs: | |
| +# unix - jemalloc + allocation tracing (Linux standard) | |
| +# api - Vector internal API (vector top, health checks) | |
| +# enrichment-tables - MaxMind GeoIP / CSV enrichment tables | |
| +# sinks-gcp - Google Cloud Logging (Stackdriver) sink | |
| +# sources-kafka - Kafka source | |
| +# transforms - All transforms (VRL/remap, Lua, route, etc.) | |
| +# secrets - Secret backends | |
| +# | |
| +# RUSTFLAGS "--cfg tokio_unstable" enables tokio::runtime::Handle::metrics(), | |
| +# which the debug_dump module uses to emit per-worker queue depths on SIGTERM. | |
| +# "-C link-args=-rdynamic" retains symbol names in backtraces (matches x86_64-unknown-linux-gnu target config). | |
| +# JEMALLOC_SYS_WITH_LG_PAGE is also set via .cargo/config.toml [env], but set here | |
| +# explicitly for clarity. tokio_unstable is enabled via .cargo/config.toml rustflags. | |
| +ENV JEMALLOC_SYS_WITH_LG_PAGE=16 | |
| + | |
| +RUN cargo build --release \ | |
| + --no-default-features \ | |
| + --features "unix,api,enrichment-tables,sinks-gcp,sinks-prometheus,sources-kafka,sources-internal_metrics,transforms,secrets" | |
| + | |
| +# ─── Stage 2: Runtime (debian:12-slim — has all system libs the binary needs) ── | |
| +# Using debian:12-slim instead of distroless for this debug image: distroless | |
| +# lacks libz, libssl, libsasl2, etc. that the dynamically-linked vector binary needs. | |
| +FROM debian:12-slim | |
| + | |
| +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/* | |
| + | |
| +LABEL org.opencontainers.image.url="https://vector.dev" | |
| +LABEL org.opencontainers.image.source="https://github.com/vectordotdev/vector" | |
| +LABEL org.opencontainers.image.documentation="https://vector.dev/docs" | |
| +LABEL org.opencontainers.image.version="0.53.0-debug-sigterm-async-bt" | |
| +LABEL com.etsy.build="sigterm-debug-dump-async-backtrace" | |
| + | |
| +COPY --from=builder /vector/target/release/vector /usr/bin/vector | |
| + | |
| +ENTRYPOINT ["/usr/bin/vector"] | |
| diff --git a/src/app.rs b/src/app.rs | |
| index 9e8a415..fe1a445 100644 | |
| --- a/src/app.rs | |
| +++ b/src/app.rs | |
| @@ -249,6 +249,7 @@ impl Application { | |
| emit!(VectorStarted); | |
| handle.spawn(heartbeat::heartbeat()); | |
| + handle.spawn(crate::debug_dump::dump_debug_info_on_sigterm()); | |
| let Self { | |
| root_opts, | |
| diff --git a/src/debug_dump.rs b/src/debug_dump.rs | |
| new file mode 100644 | |
| index 0000000..3d76121 | |
| --- /dev/null | |
| +++ b/src/debug_dump.rs | |
| @@ -0,0 +1,146 @@ | |
| +/// SIGTERM debug dump: on SIGTERM, dump diagnostic info to stderr before normal shutdown. | |
| +/// | |
| +/// This avoids signal-safety issues by using an AtomicBool flag set in the signal handler | |
| +/// and a watcher async task that detects the flag and performs the dump from async context. | |
| +use std::io::Write; | |
| +use std::sync::atomic::{AtomicBool, Ordering}; | |
| + | |
| +static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false); | |
| + | |
| +/// Called from the signal handler or signal-receiving code when SIGTERM is detected. | |
| +/// Signal-safe: only sets an atomic flag. | |
| +pub fn mark_sigterm() { | |
| + SIGTERM_RECEIVED.store(true, Ordering::SeqCst); | |
| +} | |
| + | |
| +/// Spawn as a Tokio task. Polls for the SIGTERM flag and dumps debug info when triggered. | |
| +/// Returns after the dump is complete. | |
| +pub async fn dump_debug_info_on_sigterm() { | |
| + loop { | |
| + if SIGTERM_RECEIVED.load(Ordering::SeqCst) { | |
| + do_dump(); | |
| + return; | |
| + } | |
| + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; | |
| + } | |
| +} | |
| + | |
| +fn do_dump() { | |
| + let stderr = std::io::stderr(); | |
| + let mut err = stderr.lock(); | |
| + | |
| + let _ = writeln!(err, ""); | |
| + let _ = writeln!(err, "=== VECTOR SIGTERM DEBUG DUMP ==="); | |
| + let _ = writeln!( | |
| + err, | |
| + "timestamp: {:?}", | |
| + std::time::SystemTime::now() | |
| + ); | |
| + let _ = writeln!(err, "pid: {}", std::process::id()); | |
| + | |
| + // /proc/self/status: threads, VmRSS, VmPeak, etc. | |
| + match std::fs::read_to_string("/proc/self/status") { | |
| + Ok(status) => { | |
| + let _ = writeln!(err, "--- /proc/self/status ---"); | |
| + let _ = writeln!(err, "{}", status); | |
| + } | |
| + Err(e) => { | |
| + let _ = writeln!(err, "[/proc/self/status unavailable: {}]", e); | |
| + } | |
| + } | |
| + | |
| + // Open file descriptors | |
| + match std::fs::read_dir("/proc/self/fd") { | |
| + Ok(entries) => { | |
| + let fds: Vec<_> = entries.filter_map(|e| e.ok()).collect(); | |
| + let _ = writeln!(err, "--- Open FDs: {} ---", fds.len()); | |
| + for fd in fds.iter().take(200) { | |
| + if let Ok(target) = std::fs::read_link(fd.path()) { | |
| + let _ = writeln!(err, " {:?} -> {:?}", fd.file_name(), target); | |
| + } | |
| + } | |
| + } | |
| + Err(e) => { | |
| + let _ = writeln!(err, "[/proc/self/fd unavailable: {}]", e); | |
| + } | |
| + } | |
| + | |
| + // Per-thread info via /proc/self/task | |
| + match std::fs::read_dir("/proc/self/task") { | |
| + Ok(tasks) => { | |
| + let task_list: Vec<_> = tasks.filter_map(|t| t.ok()).collect(); | |
| + let _ = writeln!(err, "--- Threads: {} ---", task_list.len()); | |
| + for task in &task_list { | |
| + let tid = task.file_name(); | |
| + let _ = writeln!(err, "--- Thread {:?} ---", tid); | |
| + | |
| + let status_path = | |
| + format!("/proc/self/task/{}/status", tid.to_string_lossy()); | |
| + if let Ok(s) = std::fs::read_to_string(&status_path) { | |
| + let _ = writeln!(err, "{}", s); | |
| + } | |
| + | |
| + // wchan: what kernel function the thread is blocked on | |
| + let wchan_path = | |
| + format!("/proc/self/task/{}/wchan", tid.to_string_lossy()); | |
| + if let Ok(wchan) = std::fs::read_to_string(&wchan_path) { | |
| + let _ = writeln!(err, "wchan: {}", wchan); | |
| + } | |
| + | |
| + // syscall: current syscall the thread is in (if any) | |
| + let syscall_path = | |
| + format!("/proc/self/task/{}/syscall", tid.to_string_lossy()); | |
| + if let Ok(syscall) = std::fs::read_to_string(&syscall_path) { | |
| + let _ = writeln!(err, "syscall: {}", syscall); | |
| + } | |
| + } | |
| + } | |
| + Err(e) => { | |
| + let _ = writeln!(err, "[/proc/self/task unavailable: {}]", e); | |
| + } | |
| + } | |
| + | |
| + // Backtrace of the current (dumper) thread | |
| + let _ = writeln!(err, "--- Current thread backtrace ---"); | |
| + let bt = backtrace::Backtrace::new(); | |
| + let _ = writeln!(err, "{:?}", bt); | |
| + | |
| + // Async task dump: shows every suspended #[async_backtrace::framed] future and its | |
| + // current await chain. Use wait_for_running_tasks=false so we capture immediately — | |
| + // suspended tasks (the ones we care about in a deadlock) appear unconditionally. | |
| + let _ = writeln!(err, "--- Async task dump (async-backtrace) ---"); | |
| + let tree = async_backtrace::taskdump_tree(false); | |
| + if tree.is_empty() { | |
| + let _ = writeln!(err, "[no framed tasks suspended]"); | |
| + } else { | |
| + let _ = writeln!(err, "{}", tree); | |
| + } | |
| + | |
| + // Tokio runtime metrics (requires --cfg tokio_unstable at build time) | |
| + #[cfg(tokio_unstable)] | |
| + { | |
| + if let Ok(handle) = tokio::runtime::Handle::try_current() { | |
| + let metrics = handle.metrics(); | |
| + let _ = writeln!(err, "--- Tokio Runtime Metrics ---"); | |
| + let _ = writeln!(err, " num_workers: {}", metrics.num_workers()); | |
| + let _ = writeln!(err, " num_alive_tasks: {}", metrics.num_alive_tasks()); | |
| + let _ = writeln!(err, " global_queue_depth: {}", metrics.global_queue_depth()); | |
| + for i in 0..metrics.num_workers() { | |
| + let _ = writeln!( | |
| + err, | |
| + " worker[{}] queue_depth={} total_park={} total_noop={} total_steal={} total_polls={}", | |
| + i, | |
| + metrics.worker_local_queue_depth(i), | |
| + metrics.worker_park_count(i), | |
| + metrics.worker_noop_count(i), | |
| + metrics.worker_steal_count(i), | |
| + metrics.worker_poll_count(i), | |
| + ); | |
| + } | |
| + } | |
| + } | |
| + | |
| + let _ = writeln!(err, "=== END VECTOR SIGTERM DEBUG DUMP ==="); | |
| + let _ = writeln!(err, ""); | |
| + let _ = err.flush(); | |
| +} | |
| diff --git a/src/lib.rs b/src/lib.rs | |
| index d4d4ab1..4adea39 100644 | |
| --- a/src/lib.rs | |
| +++ b/src/lib.rs | |
| @@ -72,6 +72,8 @@ pub mod amqp; | |
| pub mod api; | |
| pub mod app; | |
| pub mod async_read; | |
| +/// SIGTERM debug dump: dumps thread/runtime diagnostics to stderr on shutdown. | |
| +pub mod debug_dump; | |
| #[cfg(feature = "aws-config")] | |
| pub mod aws; | |
| pub mod common; | |
| diff --git a/src/signal.rs b/src/signal.rs | |
| index 0eefae1..e7296d9 100644 | |
| --- a/src/signal.rs | |
| +++ b/src/signal.rs | |
| @@ -208,6 +208,7 @@ fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> { | |
| }, | |
| _ = sigterm.recv() => { | |
| info!(message = "Signal received.", signal = "SIGTERM"); | |
| + crate::debug_dump::mark_sigterm(); | |
| SignalTo::Shutdown(None) | |
| } , | |
| _ = sigquit.recv() => { | |
| diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs | |
| index ec582ed..7bb0b7b 100644 | |
| --- a/src/sinks/gcp/stackdriver/logs/sink.rs | |
| +++ b/src/sinks/gcp/stackdriver/logs/sink.rs | |
| @@ -32,6 +32,7 @@ where | |
| } | |
| } | |
| + #[async_backtrace::framed] | |
| async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> { | |
| input | |
| // Batch the input stream with size calculation based on the estimated encoded json size | |
| @@ -67,6 +68,7 @@ where | |
| S::Response: DriverResponse + Send + 'static, | |
| S::Error: std::fmt::Debug + Into<crate::Error> + Send, | |
| { | |
| + #[async_backtrace::framed] | |
| async fn run( | |
| self: Box<Self>, | |
| input: futures_util::stream::BoxStream<'_, Event>, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment