Last active
February 1, 2026 02:24
-
-
Save MikuAuahDark/94dfe7496d973105a005658f617293d5 to your computer and use it in GitHub Desktop.
Lazy HTTP Tunnel/CONNECT Proxy in Python 3.7+ using Cloudflare as DNS Resolver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # HTTP tunneling with DoH resolver | |
| # | |
| # The motivation to write this is because youtube-dl lack DoH or cURL | |
| # "--resolve" equivalent which prevents me from downloading reddit videos | |
| # because all reddit (sub)domains are blocked in my country. | |
| # | |
| # To use this proxy, you need 2 terminals. | |
| # * 1 terminal is to run this proxy server: | |
| # $ python3 httptunnel.py | |
| # * then the other terminal to download the videos with youtube-dl | |
| # $ youtube-dl --proxy http://localhost:1080 ... | |
| # | |
| # The throughput of this script is shitty, but it works. Error handling is | |
| # kinda lazy, so expect something breaks on unstable internet. | |
| # | |
| # This script is tested in Python 3.8.5 in Windows but should work in other | |
| # OSes too. | |
| # | |
| # You can use part or all of this code without my permission. Attribution | |
| # to "Miku AuahDark" is appreciated, but it's not required. | |
| import http.client as httpc | |
| import http.server as https | |
| import errno | |
| import json | |
| import re | |
| import socket | |
| from select import select | |
| import ssl | |
| import urllib.parse as parse | |
| from time import time | |
| class HostEntry: | |
| def __init__(self, ip: str, ttl: int): | |
| self.ip = ip | |
| self.ttl = ttl | |
| cached_resolver: dict = {} | |
| ipmatcher = re.compile("^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$") | |
| # Resolve DNS using Cloudflare | |
| def resolve_dns(host: str)-> str: | |
| if ipmatcher.match(host): | |
| return host | |
| if host in cached_resolver and cached_resolver[host] != None: | |
| hostent: HostEntry = cached_resolver[host] | |
| if int(time()) >= hostent.ttl: | |
| cached_resolver[host] = None | |
| else: | |
| return hostent.ip | |
| try: | |
| # Cloudflare DNS JSON format | |
| dns = httpc.HTTPSConnection('1.0.0.1') | |
| dns.request('GET', '/dns-query?type=A&name=' + parse.quote(host), headers={'Accept': 'application/dns-json'}) | |
| dnsresp = dns.getresponse() | |
| if dnsresp.getcode() != 200: | |
| return None | |
| # Decode JSON response | |
| jsondata = json.load(dnsresp) | |
| if len(jsondata['Answer']) == 0: | |
| return None | |
| # Recursive resolving | |
| targethost = host | |
| while True: | |
| found = False | |
| for data in jsondata['Answer']: | |
| if data['name'] == targethost: | |
| found = True | |
| # Check for IP match | |
| if ipmatcher.match(data['data']): | |
| hostent = HostEntry(data['data'], data['TTL'] + int(time())) | |
| cached_resolver[host] = hostent | |
| return hostent.ip | |
| else: | |
| targethost = data['data'] | |
| if targethost[-1] == '.': | |
| targethost = targethost[:-1] | |
| break | |
| if not found: | |
| return None | |
| except Exception as e: | |
| return None | |
| class ProxyHTTPHandler(https.BaseHTTPRequestHandler): | |
| # TODO: do_PUT, do_POST, do_HEAD, do_OPTIONS | |
| # but the main purpose of this proxy script is to have do_CONNECT | |
| # so it's already sufficient for my (Miku AuahDark) use case | |
| rbufsize = 0 | |
| def do_GET(self): | |
| # The task is to proxy GET request | |
| if self.path.startswith('http://'): | |
| # Proxy | |
| target = parse.urlparse(self.path) | |
| unamepasswdurl = target.netloc.split('@') | |
| hostport = unamepasswdurl[-1].split(':') | |
| host = hostport[0] | |
| port = int(hostport[1]) if len(hostport) > 1 else 80 | |
| ip = resolve_dns(host) | |
| if ip == None: | |
| ip = host | |
| proxy = httpc.HTTPConnection(ip, port) | |
| headers = {} | |
| for h in self.headers.items(): | |
| headers[h[0]] = h[1] | |
| proxy.request('GET', '/' + self.path.split('/')[3], headers=headers) | |
| proxyresp = proxy.getresponse() | |
| self.send_response(proxyresp.status, proxyresp.reason) | |
| for h in proxyresp.msg.items(): | |
| self.send_header(h[0], h[1]) | |
| self.end_headers() | |
| self.wfile.write(proxyresp.read()) | |
| else: | |
| self.send_response(204, '') | |
| self.end_headers() | |
| def do_CONNECT(self): | |
| # The task is to open socket on the other end then proxy it | |
| # but resolve the host with DoH | |
| hostport = self.path.split(':') | |
| ip = resolve_dns(hostport[0]) or hostport[0] | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) | |
| sock.connect((ip, int(hostport[1]))) | |
| sock.setblocking(False) | |
| self.send_response(200, '') | |
| self.end_headers() | |
| self.request.setblocking(False) | |
| while True: | |
| readable, writable, exceptional = select([sock], [sock], [], None) | |
| if sock in readable: | |
| # Write to self.wfile | |
| while True: | |
| nodata = False | |
| try: | |
| msg = sock.recv(4096) | |
| self.wfile.write(msg) | |
| except socket.error as e: | |
| if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK: | |
| nodata = True | |
| else: | |
| # Remote closed connection | |
| self.request.setblocking(True) | |
| return | |
| if nodata: | |
| break | |
| elif sock in writable: | |
| # Read from self.request | |
| while True: | |
| nodata = False | |
| try: | |
| data = self.request.recv(4096) | |
| if len(data) == 0: | |
| # Close | |
| sock.close() | |
| self.request.setblocking(True) | |
| return | |
| sock.send(data) | |
| except socket.error as e: | |
| if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK: | |
| nodata = True | |
| else: | |
| # Closed | |
| sock.close() | |
| self.request.setblocking(True) | |
| return | |
| if nodata: | |
| break | |
| if __name__ == '__main__': | |
| httpd = https.ThreadingHTTPServer(('0.0.0.0', 1080), ProxyHTTPHandler) | |
| httpd.serve_forever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # HTTP tunneling with Simple DPI circumventer in Python 3.10+. | |
| # | |
| # The motivation to write this is because my country starts doing passive DPI | |
| # checking to blocked sites. Access to the blocked sites works fine in | |
| # browsers because both browsers and the blocked site implement Encrypted | |
| # Client Hello. Unfortunately, Python doesn't yet support ECH. | |
| # | |
| # The circumvention strategy is as follows: | |
| # * Fragment the TLS handshake so it send in 4 byte chunks. This must be | |
| # properly handled by any compliant server as per TCP specification. | |
| # However, most passive DPI won't bother re-assembling the TCP packets and | |
| # just checks everything (including the SNI) everytime packet arrives. | |
| # * HTTP `Host` header replacement. It replaces the `Host` header with `hOsT`. | |
| # HTTP server treats HTTP headers as case-insensitive as per specification. | |
| # However, most passive DPI explicitly checks for `Host` case-sensitive. | |
| # * If neither are plausible, it just send it as-is, no fragmentation nor | |
| # replacement. | |
| # | |
| # To use this proxy, you need 2 terminals. | |
| # * 1 terminal is to run this proxy server: | |
| # $ python3 httptunnel2.py | |
| # * then the other terminal to download the videos with yt-dlp | |
| # $ yt-dlp --proxy http://localhost:1080 ... | |
| # | |
| # The throughput of this script is shitty, but it works. Error handling is | |
| # kinda lazy, so expect something breaks on unstable internet. | |
| # | |
| # This script is tested in Python 3.10 in Windows but should work in other | |
| # OSes too. | |
| # | |
| # You can use part or all of this code without my permission. Attribution | |
| # to "Miku AuahDark" is appreciated, but it's not required. | |
| import itertools | |
| import http.client | |
| import http.server | |
| import socket | |
| import select | |
| import urllib.parse | |
| def fragmenter(data: bytes): | |
| if len(data) >= 4 and data.startswith(b"\x16\x03\x01") and data[2] in range(1, 5): | |
| print("TLS handshake fragmenter") | |
| # For Python 3.10 compatibility, we cannot use itertools.batched here. | |
| it = iter(data) | |
| while True: | |
| # Fragment on every 4 bytes | |
| batch = bytes(itertools.islice(it, 4)) | |
| if not batch: | |
| break | |
| yield batch | |
| elif b"HTTP/1.1\r\n" in data or b"HTTP/1.0\r\n" in data: | |
| print("HTTP host replacer", data) | |
| parts = data.split(b"\r\n\r\n", 1) | |
| if parts: | |
| yield parts[0].replace(b"\r\nHost: ", b"\r\nhOsT: ") | |
| if len(parts) > 1: | |
| yield b"\r\n\r\n" | |
| yield parts[1] | |
| else: | |
| # No TLS present. | |
| yield data | |
| class ProxyHTTPHandler(http.server.BaseHTTPRequestHandler): | |
| # TODO: do_PUT, do_POST, do_HEAD, do_OPTIONS | |
| # but the main purpose of this proxy script is to have do_CONNECT | |
| # so it's already sufficient for my (Miku AuahDark) use case | |
| rbufsize = 0 | |
| def do_GET(self): | |
| # The task is to proxy GET request | |
| if self.path.startswith("http://"): | |
| # Proxy | |
| target = urllib.parse.urlparse(self.path) | |
| unamepasswdurl = target.netloc.split("@") | |
| hostport = unamepasswdurl[-1].split(":") | |
| host = hostport[0] | |
| port = int(hostport[1]) if len(hostport) > 1 else 80 | |
| proxy = http.client.HTTPConnection(host, port) | |
| headers = {} | |
| for h in self.headers.items(): | |
| headers[h[0]] = h[1] | |
| proxy.request("GET", "/" + self.path.split("/")[3], headers=headers) | |
| proxyresp = proxy.getresponse() | |
| self.send_response(proxyresp.status, proxyresp.reason) | |
| for h in proxyresp.msg.items(): | |
| self.send_header(h[0], h[1]) | |
| self.end_headers() | |
| self.wfile.write(proxyresp.read()) | |
| else: | |
| self.send_response(204, "") | |
| self.end_headers() | |
| def do_CONNECT(self): | |
| # The task is to open socket on the other end then fragment the TLS if any | |
| hostport = self.path.split(":") | |
| remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| remote.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |
| remote.connect((hostport[0], int(hostport[1]))) | |
| remote.setblocking(False) | |
| self.send_response(200, "") | |
| self.end_headers() | |
| client: socket.socket = self.connection | |
| client.settimeout(60) | |
| remote.settimeout(60) | |
| sockets = [client, remote] | |
| try: | |
| readable: list[socket.socket] | |
| while True: | |
| readable, _, _ = select.select(sockets, [], []) | |
| for s in readable: | |
| other = remote if s is client else client | |
| data = s.recv(4096) | |
| if not data: | |
| return | |
| for newdata in fragmenter(data): | |
| other.sendall(newdata) | |
| finally: | |
| remote.close() | |
| if __name__ == "__main__": | |
| httpd = http.server.ThreadingHTTPServer(("0.0.0.0", 1080), ProxyHTTPHandler) | |
| httpd.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment