Created
March 6, 2026 22:25
-
-
Save devnexen/dd7904b8c6ce8ea499055740bc5dddc5 to your computer and use it in GitHub Desktop.
PoC: Stale HTTP/2 control frames leaked to new upstream on gRPC reinit (nginx PR #1136)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| PoC: Stale HTTP/2 control frames leaked to new upstream on grpc reinit. | |
| Demonstrates that without the ctx->out/ctx->in/ctx->busy = NULL fix in | |
| ngx_http_grpc_reinit_request(), PING ACK / SETTINGS ACK frames | |
| queued for a failed upstream connection are sent to the next upstream | |
| during a grpc_next_upstream retry. | |
| Setup: | |
| upstream1 (port 9081): Completes HTTP/2 handshake, sends PING, | |
| then GOAWAY with last_stream_id=0 to force retry. | |
| upstream2 (port 9082): Accepts connection, logs all received HTTP/2 frames. | |
| nginx (port 9080): grpc_pass to upstream{} block. | |
| """ | |
| import os | |
| import socket | |
| import struct | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| # HTTP/2 frame types | |
| DATA = 0x0 | |
| HEADERS = 0x1 | |
| RST_STREAM = 0x3 | |
| SETTINGS = 0x4 | |
| PING = 0x6 | |
| GOAWAY = 0x7 | |
| WINDOW_UPDATE = 0x8 | |
| # Flags | |
| ACK = 0x1 | |
| H2_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" | |
| FRAME_NAMES = { | |
| 0x0: "DATA", 0x1: "HEADERS", 0x2: "PRIORITY", 0x3: "RST_STREAM", | |
| 0x4: "SETTINGS", 0x5: "PUSH_PROMISE", 0x6: "PING", 0x7: "GOAWAY", | |
| 0x8: "WINDOW_UPDATE", 0x9: "CONTINUATION", | |
| } | |
| def make_frame(ftype, flags, stream_id, payload=b""): | |
| length = len(payload) | |
| hdr = struct.pack("!I", length)[1:] # 3 bytes | |
| hdr += struct.pack("!BB", ftype, flags) | |
| hdr += struct.pack("!I", stream_id & 0x7FFFFFFF) | |
| return hdr + payload | |
| def parse_frames(data): | |
| """Parse all HTTP/2 frames from raw bytes (after stripping preface).""" | |
| frames = [] | |
| off = 0 | |
| while off + 9 <= len(data): | |
| length = struct.unpack("!I", b"\x00" + data[off:off+3])[0] | |
| ftype = data[off+3] | |
| flags = data[off+4] | |
| sid = struct.unpack("!I", data[off+5:off+9])[0] & 0x7FFFFFFF | |
| payload = data[off+9:off+9+length] | |
| if off + 9 + length > len(data): | |
| break # incomplete frame | |
| frames.append((ftype, flags, sid, payload)) | |
| off += 9 + length | |
| return frames | |
| def upstream1(port): | |
| """Fake upstream: SETTINGS + PING + GOAWAY(0) to force retry.""" | |
| srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| srv.bind(("127.0.0.1", port)) | |
| srv.listen(1) | |
| srv.settimeout(10) | |
| try: | |
| conn, _ = srv.accept() | |
| except socket.timeout: | |
| srv.close() | |
| return | |
| # Read nginx's connection preface + request | |
| data = b"" | |
| while len(data) < len(H2_PREFACE) + 9: | |
| chunk = conn.recv(4096) | |
| if not chunk: | |
| break | |
| data += chunk | |
| # Send ALL frames in a single write so nginx processes them in one | |
| # pass through process_header, before the output filter can flush ctx->out. | |
| resp = b"" | |
| # Server SETTINGS (empty, no settings to change) | |
| resp += make_frame(SETTINGS, 0, 0) | |
| # SETTINGS ACK (ack nginx's settings) | |
| resp += make_frame(SETTINGS, ACK, 0) | |
| # PING with opaque data | |
| ping_data = b"\xDE\xAD\xBE\xEF\xCA\xFE\xBA\xBE" | |
| resp += make_frame(PING, 0, 0, ping_data) | |
| # GOAWAY: last_stream_id=0, error_code=0 (NO_ERROR) | |
| # last_stream_id=0 < nginx's stream 1, so nginx must retry | |
| goaway_payload = struct.pack("!II", 0, 0) # last_stream_id=0, error=0 | |
| resp += make_frame(GOAWAY, 0, 0, goaway_payload) | |
| conn.sendall(resp) | |
| time.sleep(0.1) | |
| conn.close() | |
| srv.close() | |
| upstream2_frames = [] | |
| def upstream2(port): | |
| """Second upstream: log all received frames.""" | |
| global upstream2_frames | |
| upstream2_frames = [] | |
| srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| srv.bind(("127.0.0.1", port)) | |
| srv.listen(1) | |
| srv.settimeout(10) | |
| try: | |
| conn, _ = srv.accept() | |
| except socket.timeout: | |
| srv.close() | |
| return | |
| conn.settimeout(2) | |
| # Read all data nginx sends | |
| data = b"" | |
| while True: | |
| try: | |
| chunk = conn.recv(4096) | |
| if not chunk: | |
| break | |
| data += chunk | |
| except socket.timeout: | |
| break | |
| # Strip H2 preface if present | |
| if data.startswith(H2_PREFACE): | |
| raw = data[len(H2_PREFACE):] | |
| else: | |
| raw = data | |
| upstream2_frames = parse_frames(raw) | |
| conn.close() | |
| srv.close() | |
| def write_nginx_conf(tmpdir, nginx_port, up1_port, up2_port): | |
| conf = f"""\ | |
| daemon off; | |
| master_process off; | |
| worker_processes 1; | |
| error_log {tmpdir}/error.log debug; | |
| pid {tmpdir}/nginx.pid; | |
| events {{ | |
| worker_connections 64; | |
| }} | |
| http {{ | |
| access_log off; | |
| upstream grpc_backend {{ | |
| server 127.0.0.1:{up1_port}; | |
| server 127.0.0.1:{up2_port}; | |
| }} | |
| server {{ | |
| listen {nginx_port} http2; | |
| location / {{ | |
| grpc_pass grpc://grpc_backend; | |
| grpc_next_upstream error timeout invalid_header non_idempotent; | |
| grpc_connect_timeout 2s; | |
| grpc_read_timeout 2s; | |
| }} | |
| }} | |
| }} | |
| """ | |
| path = os.path.join(tmpdir, "nginx.conf") | |
| with open(path, "w") as f: | |
| f.write(conf) | |
| return path | |
| def main(): | |
| nginx_bin = os.path.join(os.path.dirname(__file__), "objs", "nginx") | |
| if not os.path.isfile(nginx_bin): | |
| print(f"ERROR: nginx binary not found at {nginx_bin}", file=sys.stderr) | |
| sys.exit(1) | |
| nginx_port = 9080 | |
| up1_port = 9081 | |
| up2_port = 9082 | |
| tmpdir = tempfile.mkdtemp(prefix="nginx_grpc_poc_") | |
| # Ensure temp dirs exist for nginx | |
| for d in ("client_body_temp", "proxy_temp", "fastcgi_temp", | |
| "uwsgi_temp", "scgi_temp", "logs"): | |
| os.makedirs(os.path.join(tmpdir, d), exist_ok=True) | |
| conf_path = write_nginx_conf(tmpdir, nginx_port, up1_port, up2_port) | |
| # Start upstream servers | |
| t1 = threading.Thread(target=upstream1, args=(up1_port,), daemon=True) | |
| t2 = threading.Thread(target=upstream2, args=(up2_port,), daemon=True) | |
| t1.start() | |
| t2.start() | |
| time.sleep(0.2) | |
| # Start nginx | |
| nginx_proc = subprocess.Popen( | |
| [nginx_bin, "-c", conf_path, "-p", tmpdir], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
| ) | |
| time.sleep(0.5) | |
| # Send an HTTP/2 request (gRPC uses HTTP/2). | |
| # We send the h2 connection preface + SETTINGS + a minimal HEADERS frame | |
| # for a gRPC request: POST /test.Service/Method | |
| h2_req = b"" | |
| h2_req += H2_PREFACE | |
| # SETTINGS (empty) | |
| h2_req += make_frame(SETTINGS, 0, 0) | |
| # Minimal HPACK-encoded HEADERS for: POST /test.Service/Method | |
| # Using static table indices: | |
| # :method POST = index 3 (0x83) | |
| # :scheme http = index 6 (0x86) | |
| # :path /test.Service/Method = literal with indexing, name index 4 | |
| # :authority localhost = literal with indexing, name index 1 | |
| # content-type application/grpc = literal with indexing, name index 31 | |
| # te trailers = literal with indexing, name index 0 | |
| path = b"/test.Service/Method" | |
| authority = b"localhost" | |
| content_type = b"application/grpc" | |
| te = b"trailers" | |
| hpack = b"" | |
| hpack += b"\x83" # :method POST (indexed) | |
| hpack += b"\x86" # :scheme http (indexed) | |
| # :path (literal with incremental indexing, name index 4) | |
| hpack += b"\x44" | |
| hpack += bytes([len(path)]) + path | |
| # :authority (literal with incremental indexing, name index 1) | |
| hpack += b"\x41" | |
| hpack += bytes([len(authority)]) + authority | |
| # content-type (literal with incremental indexing, name index 31) | |
| hpack += b"\x5f" # 0x40 | 31 = 0x5f | |
| hpack += bytes([len(content_type)]) + content_type | |
| # te (literal with incremental indexing, new name) | |
| hpack += b"\x40" | |
| hpack += bytes([len(b"te")]) + b"te" | |
| hpack += bytes([len(te)]) + te | |
| h2_req += make_frame(HEADERS, 0x04 | 0x01, 1, hpack) # END_HEADERS | END_STREAM | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.settimeout(5) | |
| s.connect(("127.0.0.1", nginx_port)) | |
| s.sendall(h2_req) | |
| # Read nginx's SETTINGS and send SETTINGS ACK, then keep alive | |
| # long enough for the full upstream retry cycle | |
| deadline = time.time() + 8 | |
| while time.time() < deadline: | |
| try: | |
| resp = s.recv(4096) | |
| if not resp: | |
| break | |
| # Parse frames and send SETTINGS ACK for any SETTINGS we receive | |
| off = 0 | |
| while off + 9 <= len(resp): | |
| flen = struct.unpack("!I", b"\x00" + resp[off:off+3])[0] | |
| ftype_r = resp[off+3] | |
| flags_r = resp[off+4] | |
| if off + 9 + flen > len(resp): | |
| break | |
| if ftype_r == SETTINGS and not (flags_r & ACK): | |
| s.sendall(make_frame(SETTINGS, ACK, 0)) | |
| off += 9 + flen | |
| except socket.timeout: | |
| break | |
| s.close() | |
| except Exception as e: | |
| print(f"Request failed: {e}") | |
| # Wait for upstreams to finish | |
| t1.join(timeout=5) | |
| t2.join(timeout=5) | |
| # Kill nginx | |
| nginx_proc.terminate() | |
| nginx_proc.wait(timeout=5) | |
| # Analyze frames received by upstream2 | |
| print("\n=== Frames received by upstream2 ===") | |
| stale_ping_ack = False | |
| stale_settings_ack_count = 0 | |
| for ftype, flags, sid, payload in upstream2_frames: | |
| name = FRAME_NAMES.get(ftype, f"UNKNOWN({ftype})") | |
| flag_str = "" | |
| if flags & ACK: | |
| flag_str = " [ACK]" | |
| print(f" {name}{flag_str} stream={sid} len={len(payload)}") | |
| if ftype == PING and (flags & ACK): | |
| stale_ping_ack = True | |
| print(f" *** PING ACK data: {payload.hex()}") | |
| if ftype == SETTINGS and (flags & ACK): | |
| stale_settings_ack_count += 1 | |
| print() | |
| bug_found = False | |
| if stale_ping_ack: | |
| print("BUG: upstream2 received PING ACK but never sent a PING!") | |
| print(" This is a stale frame leaked from the failed upstream1 connection.") | |
| bug_found = True | |
| if stale_settings_ack_count > 1: | |
| print(f"BUG: upstream2 received {stale_settings_ack_count} SETTINGS ACK frames") | |
| print(" (expected at most 1). Extra ones are stale from upstream1.") | |
| bug_found = True | |
| if not bug_found: | |
| print("OK: No stale control frames detected on upstream2.") | |
| print(" (The fix in reinit_request is working correctly.)") | |
| # Show relevant debug log lines | |
| error_log = os.path.join(tmpdir, "error.log") | |
| if os.path.exists(error_log): | |
| with open(error_log) as f: | |
| lines = f.readlines() | |
| relevant = [l.rstrip() for l in lines | |
| if "goaway" in l.lower() | |
| or "ping ack" in l.lower() | |
| or "reinit" in l.lower() | |
| or "next upstream" in l.lower() | |
| or "settings ack" in l.lower()] | |
| if relevant: | |
| print("\n=== Relevant nginx debug log lines ===") | |
| for l in relevant[:20]: | |
| print(f" {l}") | |
| print(f"\nFull debug log: {error_log}") | |
| print(f"Temp dir: {tmpdir}") | |
| sys.exit(1 if bug_found else 0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment