Created
March 23, 2026 18:45
-
-
Save timhok/4c65edd1ede55352e22394e78c481fd7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Protocol probe client — run on your machine""" | |
| import socket | |
| import struct | |
| import time | |
| import sys | |
| VPS = sys.argv[1] if len(sys.argv) > 1 else input("VPS IP: ").strip() | |
| # Protocols to test | |
| PROTOCOLS = { | |
| 4: "IPIP", | |
| 33: "DCCP", | |
| 41: "SIT/6in4", | |
| 43: "IPv6-Route", | |
| 44: "IPv6-Frag", | |
| 47: "GRE", | |
| 50: "ESP", | |
| 51: "AH", | |
| 55: "MOBILE", | |
| 59: "IPv6-NoNxt", | |
| 60: "IPv6-Opts", | |
| 92: "MTP", | |
| 94: "IPIP-Enc", | |
| 97: "EtherIP", | |
| 98: "ENCAP", | |
| 103: "PIM", | |
| 108: "IPCOMP", | |
| 112: "VRRP", | |
| 124: "IS-IS", | |
| 132: "SCTP", | |
| 133: "FC", | |
| 135: "MobilityHdr", | |
| 136: "UDPLite", | |
| 137: "MPLS-in-IP", | |
| 138: "manet", | |
| 139: "HIP", | |
| 140: "Shim6", | |
| 141: "WESP", | |
| 142: "ROHC", | |
| 143: "Ethernet", | |
| 253: "Experimental1", | |
| 254: "Experimental2", | |
| } | |
| BYTE_TEST_TARGETS = [5000, 10000, 15000, 20000, 25000, 30000, 50000, 100000] | |
| def phase1_reachability(): | |
| """Phase 1: Send a few packets on each protocol, see which arrive""" | |
| print("\n" + "=" * 60) | |
| print("PHASE 1: Protocol Reachability Test") | |
| print("=" * 60) | |
| print(f"Testing {len(PROTOCOLS)} protocols against {VPS}") | |
| print("Check server output for which protocols arrive.\n") | |
| for proto, name in sorted(PROTOCOLS.items()): | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_RAW, proto) | |
| # Send 3 packets per protocol | |
| for i in range(3): | |
| marker = f"PROBE-{proto}-{name}-{i}".encode().ljust(64, b'\x00') | |
| s.sendto(marker, (VPS, 0)) | |
| s.close() | |
| print(f" Sent proto {proto:>3} ({name})") | |
| except Exception as e: | |
| print(f" SKIP proto {proto:>3} ({name}): {e}") | |
| time.sleep(0.2) | |
| print("\nPhase 1 done. Check server for results.") | |
| print("Press Enter to continue to Phase 2...") | |
| input() | |
| def phase2_byte_limits(protos_to_test): | |
| """Phase 2: Test byte limits on protocols that passed Phase 1""" | |
| print("\n" + "=" * 60) | |
| print("PHASE 2: Byte Limit Test") | |
| print("=" * 60) | |
| print(f"Testing protocols: {protos_to_test}") | |
| print(f"Byte targets: {BYTE_TEST_TARGETS}\n") | |
| chunk_size = 1000 | |
| for proto in protos_to_test: | |
| name = PROTOCOLS.get(proto, f"Proto-{proto}") | |
| print(f"\n--- Testing {name} (proto {proto}) ---") | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_RAW, proto) | |
| except Exception as e: | |
| print(f" Cannot open socket: {e}") | |
| continue | |
| sent = 0 | |
| target = BYTE_TEST_TARGETS[-1] # send up to max | |
| pkt_num = 0 | |
| while sent < target: | |
| payload = struct.pack('!I', pkt_num) + (b'X' * (chunk_size - 4)) | |
| try: | |
| s.sendto(payload, (VPS, 0)) | |
| sent += chunk_size | |
| pkt_num += 1 | |
| if sent in BYTE_TEST_TARGETS: | |
| print(f" Sent: {sent:>8} bytes ({pkt_num} pkts)") | |
| time.sleep(0.01) | |
| except Exception as e: | |
| print(f" FAILED at {sent} bytes: {e}") | |
| break | |
| print(f" Finished: {sent} bytes total") | |
| s.close() | |
| print(" Pausing 5s between protocols...") | |
| time.sleep(5) | |
| def phase3_tcp_udp_limits(): | |
| """Phase 3: Test TCP and UDP byte limits""" | |
| print("\n" + "=" * 60) | |
| print("PHASE 3: TCP/UDP Byte Limit Test") | |
| print("=" * 60) | |
| # UDP test | |
| print("\n--- UDP Test ---") | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sent = 0 | |
| target = 100000 | |
| while sent < target: | |
| s.sendto(b'U' * 1000, (VPS, 9998)) | |
| sent += 1000 | |
| if sent % 10000 == 0: | |
| print(f" UDP sent: {sent} bytes") | |
| time.sleep(0.01) | |
| print(f" UDP finished: {sent} bytes") | |
| s.close() | |
| except Exception as e: | |
| print(f" UDP failed: {e}") | |
| time.sleep(3) | |
| # TCP test | |
| print("\n--- TCP Test ---") | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.settimeout(10) | |
| s.connect((VPS, 9999)) | |
| sent = 0 | |
| target = 100000 | |
| while sent < target: | |
| try: | |
| s.send(b'T' * 1000) | |
| sent += 1000 | |
| if sent % 5000 == 0: | |
| print(f" TCP sent: {sent} bytes") | |
| time.sleep(0.01) | |
| except Exception as e: | |
| print(f" TCP DIED at {sent} bytes: {e}") | |
| break | |
| print(f" TCP finished: {sent} bytes") | |
| s.close() | |
| except Exception as e: | |
| print(f" TCP connect failed: {e}") | |
| if __name__ == "__main__": | |
| print("=" * 60) | |
| print("PROTOCOL PROBE CLIENT") | |
| print(f"Target: {VPS}") | |
| print("=" * 60) | |
| phase1_reachability() | |
| print("\nEnter protocol numbers that arrived at server,") | |
| print("comma-separated (e.g., 4,47,50,55):") | |
| passed = input("> ").strip() | |
| if passed: | |
| proto_list = [int(p.strip()) for p in passed.split(",")] | |
| phase2_byte_limits(proto_list) | |
| else: | |
| print("No protocols to test for byte limits.") | |
| print("\nRun TCP/UDP byte limit test? (y/n)") | |
| if input("> ").strip().lower() == "y": | |
| phase3_tcp_udp_limits() | |
| print("\n" + "=" * 60) | |
| print("ALL TESTS COMPLETE") | |
| print("Compare client sent vs server received for each protocol.") | |
| print("=" * 60) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment