Created
October 20, 2025 13:26
-
-
Save LeeMetaX/28f2d5e152f88e3e9e886e603fb4bb44 to your computer and use it in GitHub Desktop.
Substrate Vector Firewall
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| substrate_runtime_demo.py | |
| Demonstrates: | |
| • 0-CR → hold / flow control | |
| • 1-CR → lint + result | |
| • nested-CR → verify / authenticate / quarantine | |
| """ | |
| import re, time, json, hmac, hashlib, os | |
| from datetime import datetime | |
| # --------------------------------------------------------------------- | |
| # CONFIGURATION | |
| # --------------------------------------------------------------------- | |
| SECRET_KEYS = {"orchestrator": b"shared-secret-key"} | |
| AUDIT_LOG = "audit_log.jsonl" | |
| # Utility ------------------------------------------------------------- | |
| def log_event(event_type, **data): | |
| """Append an audit record with timestamp.""" | |
| rec = {"ts": datetime.utcnow().isoformat()+"Z", | |
| "event": event_type} | |
| rec.update(data) | |
| print(f"[AUDIT] {json.dumps(rec)}") | |
| with open(AUDIT_LOG, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(rec) + "\n") | |
| # --------------------------------------------------------------------- | |
| # VERIFICATION (authenticates nested CR events) | |
| # --------------------------------------------------------------------- | |
| def verify_cr_signature(tag, payload, key_id="orchestrator"): | |
| key = SECRET_KEYS.get(key_id) | |
| if not key: | |
| return False | |
| expected = hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() | |
| return hmac.compare_digest(tag, expected) | |
| def nonce_is_fresh(nonce, window=60): | |
| """Simple monotonic / timestamp freshness check.""" | |
| try: | |
| return abs(time.time() - float(nonce)) < window | |
| except Exception: | |
| return False | |
| # --------------------------------------------------------------------- | |
| # LINTING (basic semantic check of packet) | |
| # --------------------------------------------------------------------- | |
| def lint_packet(packet, deep=False): | |
| issues = [] | |
| if deep: | |
| if "HDR" in packet or "END" in packet: | |
| issues.append("Possible nested header/footer detected.") | |
| else: | |
| if not packet.endswith("\r"): | |
| issues.append("Missing terminal CR.") | |
| return issues | |
| # --------------------------------------------------------------------- | |
| # MAIN INSPECTION LOGIC | |
| # --------------------------------------------------------------------- | |
| def process_packet(packet, meta=None, block_id="blk"): | |
| """Implements the CR control grammar with verification.""" | |
| meta = meta or {} | |
| cr_positions = [m.start() for m in re.finditer(r'\r', packet)] | |
| nested_match = re.search(r'\r.+\r', packet, re.DOTALL) | |
| # 0-CR → return control | |
| if len(cr_positions) == 0: | |
| log_event("NO_CR_DETECTED", block=block_id) | |
| return "RETURN_FLOW_CONTROL", packet | |
| # 1-CR and no nested → normal lint | |
| if len(cr_positions) == 1 and not nested_match: | |
| log_event("SINGLE_CR", block=block_id, pos=cr_positions[0]) | |
| issues = lint_packet(packet) | |
| return ("LINT_RESULT", {"issues": issues, "clean_packet": packet}) | |
| # Nested or multiple CRs → possible injection | |
| log_event("MULTIPLE_CR", block=block_id, positions=cr_positions) | |
| # Try authentication | |
| tag = meta.get("auth_tag"); nonce = meta.get("nonce"); key_id = meta.get("key_id","orchestrator") | |
| if tag and verify_cr_signature(tag, packet, key_id) and nonce_is_fresh(nonce): | |
| log_event("VERIFIED_ORCHESTRATION_EVENT", | |
| block=block_id, key_id=key_id, nonce=nonce) | |
| issues = lint_packet(packet, deep=True) | |
| return ("VERIFIED_NESTED", {"issues": issues, "packet": packet}) | |
| else: | |
| log_event("VECTOR_INJECTION_POINT", block=block_id, | |
| reason="unauthenticated nested CR") | |
| issues = lint_packet(packet, deep=True) | |
| safe_packet = packet.replace("\r", "") + "\r" | |
| return ("QUARANTINED", {"issues": issues, "sanitized": safe_packet}) | |
| # --------------------------------------------------------------------- | |
| # DEMONSTRATION | |
| # --------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| if os.path.exists(AUDIT_LOG): | |
| os.remove(AUDIT_LOG) | |
| print("\n--- Substrate Runtime Demo ---") | |
| # 0-CR case | |
| pkt0 = "LOAD STORE" | |
| print(process_packet(pkt0, block_id="case0")) | |
| # 1-CR valid | |
| pkt1 = "LOAD STORE\r" | |
| print(process_packet(pkt1, block_id="case1")) | |
| # nested-CR injection (unauthorized) | |
| pkt2 = "HDR data\rpayload\rEND\r" | |
| print(process_packet(pkt2, block_id="case2")) | |
| # nested-CR with authenticated orchestration tag | |
| pkt3 = "HDR orchestration event\rpayload\r" | |
| tag = hmac.new(SECRET_KEYS["orchestrator"], | |
| pkt3.encode(), hashlib.sha256).hexdigest() | |
| meta = {"auth_tag": tag, | |
| "nonce": str(time.time()), | |
| "key_id": "orchestrator"} | |
| print(process_packet(pkt3, meta, block_id="case3")) | |
| print("\n--- Audit log written to", AUDIT_LOG) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment