Skip to content

Instantly share code, notes, and snippets.

@sorz
Last active May 14, 2025 13:10
Show Gist options
  • Save sorz/561b919b98fe7053e9644a374078c63f to your computer and use it in GitHub Desktop.
Save sorz/561b919b98fe7053e9644a374078c63f to your computer and use it in GitHub Desktop.
(Non-standard) HTTP DNS resolver. I used it to hijack some Chinese apps' queries on my home gateway during 2019–2021. May not work today.
#!/usr/bin/env python3
import asyncio
import logging
from typing import Tuple, List
from aiohttp import web
from aiodnsresolver import Resolver, TYPES, DnsRecordDoesNotExist
from des import DesKey
TENCENT_KEYS = {
'146': b'K@ahDRdl',
'338': b']YPNNMwW',
'3092': b'LkgBm3xj',
}
async def resolve(request: web.Request) -> Tuple[str, List[str], int]:
host = request.query.get('host') or request.query.get('dn')
if host is None:
raise web.HTTPBadRequest(text='missing query')
if not '.' in host:
logging.info('not a domain: %s', host)
raise web.HTTPBadRequest(text='not a domain')
try:
ips, ttl = await resolver.query(host)
except DnsRecordDoesNotExist:
logging.info('NXDOMAIN: %s', host)
raise web.HTTPNotFound(text='NXDOMAIN')
return host, ips, ttl
async def handle_aliyun(request: web.Request) -> web.Response:
host, ips, ttl = await resolve(request)
return web.json_response(dict(host=host, ips=ips, ttl=ttl))
async def handle_tencent(request: web.Request) -> web.Response:
key_id = request.query.get('id')
ttl_enabled = request.query.get('ttl') == '1'
if key_id is None:
_, ips, ttl = await resolve(request)
cipher = None
else:
key = TENCENT_KEYS.get(key_id)
if key is None:
logging.info('missing key: %s', key_id)
raise web.HTTPNotFound(text='Missing keys')
cipher = DesKey(key)
host = request.query.get('host') or request.query['dn']
host = bytes.fromhex(host)
host = cipher.decrypt(host, padding=True).decode('utf8')
ips, ttl = await resolver.query(host)
if ttl_enabled:
resp = ';'.join(f'{ip},{ttl}' for ip in ips)
else:
resp = ';'.join(ips)
if cipher is not None:
resp = cipher.encrypt(resp.encode('utf8'), padding=True).hex()
return web.Response(text=resp)
async def handle_zhihu(request: web.Request) -> web.Response:
host = request.query['host']
if ',' not in host:
host, ips, ttl = await resolve(request)
resp = dict(host=host, ips=ips, ttl=ttl, origin_ttl=ttl)
else:
resp = []
for host in host.split(','):
ips, ttl = await resolver.query(host)
resp.append(dict(host=host, ips=ips, ttl=ttl, origin_ttl=ttl))
return web.json_response(dict(dns=resp))
class HostChooser:
def __init__(self):
self._host_handler = {}
async def do_route(self, request: web.Request):
handler = self._host_handler.get(request.host)
if handler is not None:
return await handler(request)
logging.info('Unknown host: %s', request.host)
raise web.HTTPNotFound()
def add(self, host: str, handler):
self._host_handler[host] = handler
class DNSResolver:
_resolve = None
_loop = None
async def query(self, host: str) -> Tuple[List[str], int]:
if self._resolve is None:
self._resolve, _ = Resolver()
if self._loop is None:
self._loop = asyncio.get_event_loop()
ips = await self._resolve(host, TYPES.A)
ttl = int(max(0, min(ip.expires_at for ip in ips) - self._loop.time()))
ips = [str(ip) for ip in ips]
return ips, ttl
app = web.Application()
resolver = DNSResolver()
hosts = HostChooser()
app.add_routes([web.get('/{path:.*}', hosts.do_route)])
hosts.add('203.107.1.33', handle_aliyun)
hosts.add('203.107.1.34', handle_aliyun)
hosts.add('203.107.1.65', handle_aliyun)
hosts.add('203.107.1.66', handle_aliyun)
hosts.add('119.29.29.29', handle_tencent)
hosts.add('118.89.204.198', handle_zhihu)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.getLogger('aiodnsresolver').setLevel(logging.WARN)
web.run_app(app, host='127.0.0.80', port=8011)
aiodnsresolver==0.0.151
aiohttp==3.7.3
async-timeout==3.0.1
attrs==20.3.0
chardet==3.0.4
des==1.0.5
idna==2.10
multidict==5.0.2
typing-extensions==3.7.4.3
yarl==1.6.3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment