Skip to content

Instantly share code, notes, and snippets.

@slavanap
Last active May 9, 2018 22:32
Show Gist options
  • Save slavanap/f8d691e460ffa5da731cb6e5c5e58659 to your computer and use it in GitHub Desktop.
Save slavanap/f8d691e460ffa5da731cb6e5c5e58659 to your computer and use it in GitHub Desktop.
Add RKN rules to reroute traffic
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import sys
from ipaddress import IPv4Network
iterbytes = lambda b: (b[i:i+1] for i in range(len(b)))
def get_ips():
from urllib.request import urlopen
dump = urlopen("https://raw.githubusercontent.com/zapret-info/z-i/master/dump.csv")
dump.readline()
prog = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(/\d{1,2})?")
ips = set( IPv4Network(ip.strip()) for line in dump for ip in str(line).split(';', 1)[0].split('|') if prog.match(ip.strip()) is not None )
dump.close()
return ips
class Zyxel:
def __init__(self, ip, login, passwd):
import telnetlib
self.tn = telnetlib.Telnet(ip)
self.tn.read_until(b"Login: ")
self.tn.write(login.encode('utf-8') + b"\n")
self.tn.read_until(b"Password: ")
self.tn.write(passwd.encode('utf-8') + b"\n")
self.tn.read_until(b"(config)> ")
def command(self, line):
line = line.encode('utf-8')
self.tn.write(line + b"\n")
result = self.tn.read_until(b"\r\n(config)> ") \
.rstrip(b"\r\r\n(config)> ") \
.strip(b'\x1b[K'.join(iterbytes(line)) + b"\r\n") \
.replace(b"\r\n", b"\n")
return result.decode("utf-8")
def clean_routes(self, new_routes, gateway, iface):
prog = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(/\d{1,2})?\s+%s\s+%s\s+\d+\s*" % (gateway, iface))
routes = [ list(filter(None, s.split(' '))) for s in self.command("show ip route").split('\n') if prog.match(s) is not None ]
count = 0
print("### Cleaning {} routes from router table".format(len(routes)), file=sys.stderr)
for route in routes:
ip_route = IPv4Network(route[0])
if ip_route in new_routes:
new_routes.remove(ip_route)
else:
result = self.command("no ip route %s %s" % (ip_route, route[1]))
print(result)
count += 1
if count % 100 == 0:
print("{}/{} routes done.". format(count, len(routes)), file=sys.stderr)
def add_routes(self, new_routes, gateway, iface):
count = 0
print("### Adding new {} routes to router table".format(len(new_routes)), file=sys.stderr)
for ip in sorted(new_routes):
result = self.command("ip route %s %s %s" % (ip, gateway, iface))
print(result)
count += 1
if count % 100 == 0:
print("{}/{} routes done.". format(count, len(new_routes)), file=sys.stderr)
def set_routes(self, new_routes, gateway, iface):
self.clean_routes(new_routes, gateway, iface)
self.add_routes(new_routes, gateway, iface)
def close(self):
self.tn.write(b"exit\n")
self.tn.read_all()
self.tn = None
def minimize_subnets(ips, limit):
from ipaddress import collapse_addresses
ips = list(collapse_addresses(ips))
nets = list(ips)
# Fold
while len(nets) > limit:
prefix = max(n.prefixlen for n in nets)
sys.stdout.write("/{} ".format(prefix))
sys.stdout.flush()
prefix_nets = [n for n in nets if n.prefixlen == prefix]
nets.extend(n.supernet() for n in prefix_nets)
nets = list(collapse_addresses(nets))
print()
# Unfold
dnets = dict((n, []) for n in nets)
for ip in ips:
ip_net = ip
while ip_net.prefixlen > 0:
if ip_net in dnets:
dnets[ip_net].append(ip)
break
ip_net = ip_net.supernet()
assert ip_net.prefixlen > 0
nets = set(nets)
for net, net_ips in sorted(dnets.items(), key=lambda x: len(x[1])):
if len(nets) + len(net_ips) > limit:
break
nets.remove(net)
nets.update(net_ips)
return set(nets)
#
# Usage:
# ./r.py <your_zyxel_password>
#
# more setting below:
#
if __name__ == "__main__":
# load blocked ips
ips = get_ips()
print("Got {} ip records.".format(len(ips)), file=sys.stderr)
# Add Tor subnet (optionally)
ips.add(IPv4Network("10.192.0.0/10"))
# create limited iptables ruleset
ips = minimize_subnets(ips, 2750)
print("Simplified them to {} records.".format(len(ips)), file=sys.stderr)
# update ruleset on the router
zyxel = Zyxel(ip="192.168.0.2", login="admin", passwd=sys.argv[1])
zyxel.set_routes(ips, gateway="192.168.0.100", iface="Home")
zyxel.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment