Skip to content

Instantly share code, notes, and snippets.

@MikuAuahDark
Last active February 1, 2026 02:24
Show Gist options
  • Select an option

  • Save MikuAuahDark/94dfe7496d973105a005658f617293d5 to your computer and use it in GitHub Desktop.

Select an option

Save MikuAuahDark/94dfe7496d973105a005658f617293d5 to your computer and use it in GitHub Desktop.
Lazy HTTP Tunnel/CONNECT Proxy in Python 3.7+ using Cloudflare as DNS Resolver
# HTTP tunneling with DoH resolver
#
# The motivation to write this is because youtube-dl lack DoH or cURL
# "--resolve" equivalent which prevents me from downloading reddit videos
# because all reddit (sub)domains are blocked in my country.
#
# To use this proxy, you need 2 terminals.
# * 1 terminal is to run this proxy server:
# $ python3 httptunnel.py
# * then the other terminal to download the videos with youtube-dl
# $ youtube-dl --proxy http://localhost:1080 ...
#
# The throughput of this script is shitty, but it works. Error handling is
# kinda lazy, so expect something breaks on unstable internet.
#
# This script is tested in Python 3.8.5 in Windows but should work in other
# OSes too.
#
# You can use part or all of this code without my permission. Attribution
# to "Miku AuahDark" is appreciated, but it's not required.
import http.client as httpc
import http.server as https
import errno
import json
import re
import socket
from select import select
import ssl
import urllib.parse as parse
from time import time
class HostEntry:
def __init__(self, ip: str, ttl: int):
self.ip = ip
self.ttl = ttl
cached_resolver: dict = {}
ipmatcher = re.compile("^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$")
# Resolve DNS using Cloudflare
def resolve_dns(host: str)-> str:
if ipmatcher.match(host):
return host
if host in cached_resolver and cached_resolver[host] != None:
hostent: HostEntry = cached_resolver[host]
if int(time()) >= hostent.ttl:
cached_resolver[host] = None
else:
return hostent.ip
try:
# Cloudflare DNS JSON format
dns = httpc.HTTPSConnection('1.0.0.1')
dns.request('GET', '/dns-query?type=A&name=' + parse.quote(host), headers={'Accept': 'application/dns-json'})
dnsresp = dns.getresponse()
if dnsresp.getcode() != 200:
return None
# Decode JSON response
jsondata = json.load(dnsresp)
if len(jsondata['Answer']) == 0:
return None
# Recursive resolving
targethost = host
while True:
found = False
for data in jsondata['Answer']:
if data['name'] == targethost:
found = True
# Check for IP match
if ipmatcher.match(data['data']):
hostent = HostEntry(data['data'], data['TTL'] + int(time()))
cached_resolver[host] = hostent
return hostent.ip
else:
targethost = data['data']
if targethost[-1] == '.':
targethost = targethost[:-1]
break
if not found:
return None
except Exception as e:
return None
class ProxyHTTPHandler(https.BaseHTTPRequestHandler):
# TODO: do_PUT, do_POST, do_HEAD, do_OPTIONS
# but the main purpose of this proxy script is to have do_CONNECT
# so it's already sufficient for my (Miku AuahDark) use case
rbufsize = 0
def do_GET(self):
# The task is to proxy GET request
if self.path.startswith('http://'):
# Proxy
target = parse.urlparse(self.path)
unamepasswdurl = target.netloc.split('@')
hostport = unamepasswdurl[-1].split(':')
host = hostport[0]
port = int(hostport[1]) if len(hostport) > 1 else 80
ip = resolve_dns(host)
if ip == None:
ip = host
proxy = httpc.HTTPConnection(ip, port)
headers = {}
for h in self.headers.items():
headers[h[0]] = h[1]
proxy.request('GET', '/' + self.path.split('/')[3], headers=headers)
proxyresp = proxy.getresponse()
self.send_response(proxyresp.status, proxyresp.reason)
for h in proxyresp.msg.items():
self.send_header(h[0], h[1])
self.end_headers()
self.wfile.write(proxyresp.read())
else:
self.send_response(204, '')
self.end_headers()
def do_CONNECT(self):
# The task is to open socket on the other end then proxy it
# but resolve the host with DoH
hostport = self.path.split(':')
ip = resolve_dns(hostport[0]) or hostport[0]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.connect((ip, int(hostport[1])))
sock.setblocking(False)
self.send_response(200, '')
self.end_headers()
self.request.setblocking(False)
while True:
readable, writable, exceptional = select([sock], [sock], [], None)
if sock in readable:
# Write to self.wfile
while True:
nodata = False
try:
msg = sock.recv(4096)
self.wfile.write(msg)
except socket.error as e:
if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
nodata = True
else:
# Remote closed connection
self.request.setblocking(True)
return
if nodata:
break
elif sock in writable:
# Read from self.request
while True:
nodata = False
try:
data = self.request.recv(4096)
if len(data) == 0:
# Close
sock.close()
self.request.setblocking(True)
return
sock.send(data)
except socket.error as e:
if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
nodata = True
else:
# Closed
sock.close()
self.request.setblocking(True)
return
if nodata:
break
if __name__ == '__main__':
httpd = https.ThreadingHTTPServer(('0.0.0.0', 1080), ProxyHTTPHandler)
httpd.serve_forever()
# HTTP tunneling with Simple DPI circumventer in Python 3.10+.
#
# The motivation to write this is because my country starts doing passive DPI
# checking to blocked sites. Access to the blocked sites works fine in
# browsers because both browsers and the blocked site implement Encrypted
# Client Hello. Unfortunately, Python doesn't yet support ECH.
#
# The circumvention strategy is as follows:
# * Fragment the TLS handshake so it send in 4 byte chunks. This must be
# properly handled by any compliant server as per TCP specification.
# However, most passive DPI won't bother re-assembling the TCP packets and
# just checks everything (including the SNI) everytime packet arrives.
# * HTTP `Host` header replacement. It replaces the `Host` header with `hOsT`.
# HTTP server treats HTTP headers as case-insensitive as per specification.
# However, most passive DPI explicitly checks for `Host` case-sensitive.
# * If neither are plausible, it just send it as-is, no fragmentation nor
# replacement.
#
# To use this proxy, you need 2 terminals.
# * 1 terminal is to run this proxy server:
# $ python3 httptunnel2.py
# * then the other terminal to download the videos with yt-dlp
# $ yt-dlp --proxy http://localhost:1080 ...
#
# The throughput of this script is shitty, but it works. Error handling is
# kinda lazy, so expect something breaks on unstable internet.
#
# This script is tested in Python 3.10 in Windows but should work in other
# OSes too.
#
# You can use part or all of this code without my permission. Attribution
# to "Miku AuahDark" is appreciated, but it's not required.
import itertools
import http.client
import http.server
import socket
import select
import urllib.parse
def fragmenter(data: bytes):
if len(data) >= 4 and data.startswith(b"\x16\x03\x01") and data[2] in range(1, 5):
print("TLS handshake fragmenter")
# For Python 3.10 compatibility, we cannot use itertools.batched here.
it = iter(data)
while True:
# Fragment on every 4 bytes
batch = bytes(itertools.islice(it, 4))
if not batch:
break
yield batch
elif b"HTTP/1.1\r\n" in data or b"HTTP/1.0\r\n" in data:
print("HTTP host replacer", data)
parts = data.split(b"\r\n\r\n", 1)
if parts:
yield parts[0].replace(b"\r\nHost: ", b"\r\nhOsT: ")
if len(parts) > 1:
yield b"\r\n\r\n"
yield parts[1]
else:
# No TLS present.
yield data
class ProxyHTTPHandler(http.server.BaseHTTPRequestHandler):
# TODO: do_PUT, do_POST, do_HEAD, do_OPTIONS
# but the main purpose of this proxy script is to have do_CONNECT
# so it's already sufficient for my (Miku AuahDark) use case
rbufsize = 0
def do_GET(self):
# The task is to proxy GET request
if self.path.startswith("http://"):
# Proxy
target = urllib.parse.urlparse(self.path)
unamepasswdurl = target.netloc.split("@")
hostport = unamepasswdurl[-1].split(":")
host = hostport[0]
port = int(hostport[1]) if len(hostport) > 1 else 80
proxy = http.client.HTTPConnection(host, port)
headers = {}
for h in self.headers.items():
headers[h[0]] = h[1]
proxy.request("GET", "/" + self.path.split("/")[3], headers=headers)
proxyresp = proxy.getresponse()
self.send_response(proxyresp.status, proxyresp.reason)
for h in proxyresp.msg.items():
self.send_header(h[0], h[1])
self.end_headers()
self.wfile.write(proxyresp.read())
else:
self.send_response(204, "")
self.end_headers()
def do_CONNECT(self):
# The task is to open socket on the other end then fragment the TLS if any
hostport = self.path.split(":")
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
remote.connect((hostport[0], int(hostport[1])))
remote.setblocking(False)
self.send_response(200, "")
self.end_headers()
client: socket.socket = self.connection
client.settimeout(60)
remote.settimeout(60)
sockets = [client, remote]
try:
readable: list[socket.socket]
while True:
readable, _, _ = select.select(sockets, [], [])
for s in readable:
other = remote if s is client else client
data = s.recv(4096)
if not data:
return
for newdata in fragmenter(data):
other.sendall(newdata)
finally:
remote.close()
if __name__ == "__main__":
httpd = http.server.ThreadingHTTPServer(("0.0.0.0", 1080), ProxyHTTPHandler)
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment