Skip to content

Instantly share code, notes, and snippets.

@Rafiot
Created April 18, 2025 21:26
Show Gist options
  • Select an option

  • Save Rafiot/aa1aac452c811ccddfee99c4be7b8d44 to your computer and use it in GitHub Desktop.

Select an option

Save Rafiot/aa1aac452c811ccddfee99c4be7b8d44 to your computer and use it in GitHub Desktop.
Async DNS resolution with dnspython and python_socks
#!/usr/bin/env python3
import asyncio
import dns
import socket
# Async
from dns.asyncresolver import Resolver
from python_socks.async_.asyncio import Proxy
from dns._asyncio_backend import _maybe_wait_for, StreamSocket, Backend
class Socks5Backend(Backend):
def __init__(self, proxy):
super().__init__()
self.proxy = proxy
def name(self):
return "asyncio socks5"
async def make_socket(
self,
af,
socktype,
proto=0,
source=None,
destination=None,
timeout=None,
ssl_context=None,
server_hostname=None,
):
if socktype == socket.SOCK_STREAM:
if destination is None:
# This shouldn't happen, but we check to make code analysis software
# happier.
raise ValueError("destination required for stream sockets")
sock = await self.proxy.connect(dest_host=destination[0], dest_port=destination[1])
(r, w) = await _maybe_wait_for(
asyncio.open_connection(
None,
None,
sock=sock,
ssl=ssl_context,
family=af,
proto=proto,
local_addr=source,
server_hostname=server_hostname,
),
timeout,
)
return StreamSocket(af, r, w)
raise NotImplementedError(
"unsupported socket " + f"type {socktype}"
) # pragma: no cover
async def main():
proxy = Proxy.from_url('socks5://127.0.0.1:25345')
ar = Resolver(configure=False)
# ar.nameservers = ['1.1.1.1'] # 'https://cloudflare-dns.com/dns-query', '10.2.0.1']
ar.nameservers = ['10.2.0.1']
backend = Socks5Backend(proxy)
response = await ar.resolve('google.nl', dns.rdatatype.RdataType.A, tcp=True, backend=backend)
print('Async python_socks:', response.response.answer)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment