Skip to content

Instantly share code, notes, and snippets.

@LeeMetaX
Created October 20, 2025 13:26
Show Gist options
  • Select an option

  • Save LeeMetaX/28f2d5e152f88e3e9e886e603fb4bb44 to your computer and use it in GitHub Desktop.

Select an option

Save LeeMetaX/28f2d5e152f88e3e9e886e603fb4bb44 to your computer and use it in GitHub Desktop.
Substrate Vector Firewall
"""
substrate_runtime_demo.py
Demonstrates:
• 0-CR → hold / flow control
• 1-CR → lint + result
• nested-CR → verify / authenticate / quarantine
"""
import re, time, json, hmac, hashlib, os
from datetime import datetime
# ---------------------------------------------------------------------
# CONFIGURATION
# ---------------------------------------------------------------------
SECRET_KEYS = {"orchestrator": b"shared-secret-key"}
AUDIT_LOG = "audit_log.jsonl"
# Utility -------------------------------------------------------------
def log_event(event_type, **data):
"""Append an audit record with timestamp."""
rec = {"ts": datetime.utcnow().isoformat()+"Z",
"event": event_type}
rec.update(data)
print(f"[AUDIT] {json.dumps(rec)}")
with open(AUDIT_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(rec) + "\n")
# ---------------------------------------------------------------------
# VERIFICATION (authenticates nested CR events)
# ---------------------------------------------------------------------
def verify_cr_signature(tag, payload, key_id="orchestrator"):
key = SECRET_KEYS.get(key_id)
if not key:
return False
expected = hmac.new(key, payload.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(tag, expected)
def nonce_is_fresh(nonce, window=60):
"""Simple monotonic / timestamp freshness check."""
try:
return abs(time.time() - float(nonce)) < window
except Exception:
return False
# ---------------------------------------------------------------------
# LINTING (basic semantic check of packet)
# ---------------------------------------------------------------------
def lint_packet(packet, deep=False):
issues = []
if deep:
if "HDR" in packet or "END" in packet:
issues.append("Possible nested header/footer detected.")
else:
if not packet.endswith("\r"):
issues.append("Missing terminal CR.")
return issues
# ---------------------------------------------------------------------
# MAIN INSPECTION LOGIC
# ---------------------------------------------------------------------
def process_packet(packet, meta=None, block_id="blk"):
"""Implements the CR control grammar with verification."""
meta = meta or {}
cr_positions = [m.start() for m in re.finditer(r'\r', packet)]
nested_match = re.search(r'\r.+\r', packet, re.DOTALL)
# 0-CR → return control
if len(cr_positions) == 0:
log_event("NO_CR_DETECTED", block=block_id)
return "RETURN_FLOW_CONTROL", packet
# 1-CR and no nested → normal lint
if len(cr_positions) == 1 and not nested_match:
log_event("SINGLE_CR", block=block_id, pos=cr_positions[0])
issues = lint_packet(packet)
return ("LINT_RESULT", {"issues": issues, "clean_packet": packet})
# Nested or multiple CRs → possible injection
log_event("MULTIPLE_CR", block=block_id, positions=cr_positions)
# Try authentication
tag = meta.get("auth_tag"); nonce = meta.get("nonce"); key_id = meta.get("key_id","orchestrator")
if tag and verify_cr_signature(tag, packet, key_id) and nonce_is_fresh(nonce):
log_event("VERIFIED_ORCHESTRATION_EVENT",
block=block_id, key_id=key_id, nonce=nonce)
issues = lint_packet(packet, deep=True)
return ("VERIFIED_NESTED", {"issues": issues, "packet": packet})
else:
log_event("VECTOR_INJECTION_POINT", block=block_id,
reason="unauthenticated nested CR")
issues = lint_packet(packet, deep=True)
safe_packet = packet.replace("\r", "") + "\r"
return ("QUARANTINED", {"issues": issues, "sanitized": safe_packet})
# ---------------------------------------------------------------------
# DEMONSTRATION
# ---------------------------------------------------------------------
if __name__ == "__main__":
if os.path.exists(AUDIT_LOG):
os.remove(AUDIT_LOG)
print("\n--- Substrate Runtime Demo ---")
# 0-CR case
pkt0 = "LOAD STORE"
print(process_packet(pkt0, block_id="case0"))
# 1-CR valid
pkt1 = "LOAD STORE\r"
print(process_packet(pkt1, block_id="case1"))
# nested-CR injection (unauthorized)
pkt2 = "HDR data\rpayload\rEND\r"
print(process_packet(pkt2, block_id="case2"))
# nested-CR with authenticated orchestration tag
pkt3 = "HDR orchestration event\rpayload\r"
tag = hmac.new(SECRET_KEYS["orchestrator"],
pkt3.encode(), hashlib.sha256).hexdigest()
meta = {"auth_tag": tag,
"nonce": str(time.time()),
"key_id": "orchestrator"}
print(process_packet(pkt3, meta, block_id="case3"))
print("\n--- Audit log written to", AUDIT_LOG)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment