The new serverless deployment routes workflow requests to CPU-only inference instances with remote execution enabled. Model blocks on these CPU hosts make HTTP requests to GPU instances for inference, which return X-Processing-Time headers. The downstream billing service already differentiates between CPU and GPU usage records, but needs the GPU processing times from the CPU orchestrator's response to bill correctly: fixed 100ms per frame for CPU + actual GPU time.
Currently the CPU orchestrator only returns its own wall-clock X-Processing-Time. We need to additionally return the remote GPU processing times collected during workflow execution, so the billing service has the full picture.
Key constraints:
- Don't change the usage collector (serves broader reporting purposes)
- Don't change individual blocks or the execution engine's core logic
- Gate behind an env var for the new deployment only
All 26 model blocks with remote execution use InferenceHTTPClient. Their HTTP calls flow through two paths in the SDK:
| Path | Methods | Collection point |
|---|---|---|
_post_images() / _execute_infer_from_api_request() -> execute_requests_packages() |
infer(), detect_gazes(), depth_estimation(), infer_lmm(), sam2_segment_image(), ocr_image(), sam3_3d_infer(), etc. |
execute_requests_packages() in executors.py |
Direct requests.post() |
get_clip_text_embeddings() (line 1341), clip_compare() (line 1444), get_perception_encoder_text_embeddings() (line 1552) |
Each call site in client.py |
The 2 deprecated LMM blocks use OpenAI's client directly (external API, not our GPU instances) — irrelevant.
The remote_processing_times ContextVar lives in inference_sdk/config.py alongside the existing execution_id ContextVar. This is proven safe:
- When the SDK is used standalone (outside the server), the ContextVar defaults to
None— collection is a no-op - When used within the server, the middleware initializes it per-request
- Same pattern already working in production for
execution_id
Add RemoteProcessingTimeCollector (list + threading.Lock) and remote_processing_times ContextVar next to existing execution_id.
class RemoteProcessingTimeCollector:
def __init__(self):
self._times = []
self._lock = threading.Lock()
def add(self, processing_time: float) -> None:
with self._lock:
self._times.append(processing_time)
def get_total(self) -> float:
with self._lock:
return sum(self._times)
def get_times(self) -> list:
with self._lock:
return list(self._times)
def has_data(self) -> bool:
with self._lock:
return len(self._times) > 0
remote_processing_times = contextvars.ContextVar("remote_processing_times", default=None)WORKFLOWS_REMOTE_EXECUTION_TIME_FORWARDING = str2bool(
os.getenv("WORKFLOWS_REMOTE_EXECUTION_TIME_FORWARDING", "False")
)After responses return from make_parallel_requests (back in the calling thread where ContextVar is set), read X-Processing-Time from each response. Place before the api_key_safe_raise_for_status loop so times are captured even on error.
# After results.extend(responses), BEFORE the raise-for-status loop:
collector = remote_processing_times.get()
if collector is not None:
for response in results:
pt = response.headers.get(PROCESSING_TIME_HEADER)
if pt is not None:
try:
collector.add(float(pt))
except (ValueError, TypeError):
passAdd the same collection snippet after api_key_safe_raise_for_status(response=response) in:
get_clip_text_embeddings()(line 1346)clip_compare()(line 1449)get_perception_encoder_text_embeddings()(line 1559)
These already have execution_id.get() calls right above, so the pattern is established.
5. inference/core/workflows/execution_engine/v1/executor/core.py — Propagate ContextVar to step threads
Same pattern as execution_id:
- In
execute_steps()(line 133): captureremote_processing_times.get()and pass viapartial - In
safe_execute_step()(line 160): accept param and callremote_processing_times.set(collector)
run_in_parallel() (line 465) spawns a bare ThreadPoolExecutor used by CLIP comparison and other blocks. Capture and re-set remote_processing_times (and execution_id) in worker threads. Needed because clip_compare() makes a direct requests.post() inside these threads.
In GCPServerlessMiddleware.dispatch() (line 294):
- Before
call_next: ifWORKFLOWS_REMOTE_EXECUTION_TIME_FORWARDING, create collector and set ContextVar - After
call_next: if collector has data, add two new headers:
X-Processing-Time: 2.35 # wall-clock CPU time (unchanged)
X-Remote-Processing-Time: 1.89 # sum of GPU remote times (new)
X-Remote-Processing-Times: 0.52,0.45,0.92 # individual GPU times (new)
Also expose both new headers in CORS config (line 394).
- Usage collector — untouched
X-Processing-Timeheader — stays as wall-clock time- Individual blocks — no changes to any
run_remotely()methods - Execution engine core logic — only ContextVar propagation (same as
execution_id) - External API calls (Anthropic, OpenAI, Google) — use their own HTTP clients, not collected
- Existing deployments — gated behind
WORKFLOWS_REMOTE_EXECUTION_TIME_FORWARDING=Falsedefault
- Run existing tests with env var unset — no behavior change
- Set
WORKFLOWS_REMOTE_EXECUTION_TIME_FORWARDING=True+WORKFLOWS_STEP_EXECUTION_MODE=remote, run a workflow, verify both new headers present alongside unchangedX-Processing-Time