Created
May 20, 2012 16:21
-
-
Save barbuza/2758667 to your computer and use it in GitHub Desktop.
fake python dns server for use with mac /etc/resolver system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import re | |
import socket | |
class UnsupportedQuery(BaseException): | |
pass | |
class DomainNotFound(BaseException): | |
pass | |
class MicroDNS(object): | |
def __init__(self, bind): | |
self._bind = bind | |
self._rules = [] | |
def _parse_query(self, request): | |
domain = "" | |
query_type = (ord(request[2]) >> 3) & 15 | |
if query_type is 0: | |
offset = 12 | |
length = ord(request[offset]) | |
while length != 0: | |
start = offset + 1 | |
end = start + length | |
domain += request[start:end] + "." | |
offset = end | |
length = ord(request[offset]) | |
if not domain: | |
raise UnsupportedQuery() | |
return domain[:-1] | |
def _format_response(self, request, ip): | |
return "%s\x81\x80%s%s\x00\x00\x00\x00%s\xc0\x0c\x00" \ | |
"\x01\x00\x01\x00\x00\x00\x3c\x00\x04%s" % \ | |
(request[:2], request[4:6], request[4:6], request[12:], ip) | |
def _find_ip(self, domain): | |
for pattern, ip in self._rules: | |
if pattern.match(domain): | |
break | |
else: | |
raise DomainNotFound() | |
return ip | |
def _handle_request(self, request): | |
try: | |
domain = self._parse_query(request) | |
ip = self._find_ip(domain) | |
return self._format_response(request, ip) | |
except UnsupportedQuery: | |
return "" | |
except DomainNotFound: | |
return "" | |
def add_rule(self, pattern, ip): | |
if isinstance(pattern, basestring): | |
pattern = re.compile(pattern) | |
ip = "".join(chr(int(x)) for x in ip.split(".")) | |
self._rules.append([pattern, ip]) | |
def serve_forever(self): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.bind(self._bind) | |
try: | |
while True: | |
request, addr = sock.recvfrom(1024) | |
sock.sendto(self._handle_request(request), addr) | |
except KeyboardInterrupt: | |
sock.close() | |
if __name__ == "__main__": | |
server = MicroDNS(("127.0.0.1", 55500)) | |
server.add_rule(r"^test\.py$", "127.0.0.1") | |
server.add_rule(r"^.+\.test\.py$", "127.0.0.1") | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment