-
-
Save slavanap/f8d691e460ffa5da731cb6e5c5e58659 to your computer and use it in GitHub Desktop.
Add RKN rules to reroute traffic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import re | |
import sys | |
from ipaddress import IPv4Network | |
iterbytes = lambda b: (b[i:i+1] for i in range(len(b))) | |
def get_ips(): | |
from urllib.request import urlopen | |
dump = urlopen("https://raw.githubusercontent.com/zapret-info/z-i/master/dump.csv") | |
dump.readline() | |
prog = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(/\d{1,2})?") | |
ips = set( IPv4Network(ip.strip()) for line in dump for ip in str(line).split(';', 1)[0].split('|') if prog.match(ip.strip()) is not None ) | |
dump.close() | |
return ips | |
class Zyxel: | |
def __init__(self, ip, login, passwd): | |
import telnetlib | |
self.tn = telnetlib.Telnet(ip) | |
self.tn.read_until(b"Login: ") | |
self.tn.write(login.encode('utf-8') + b"\n") | |
self.tn.read_until(b"Password: ") | |
self.tn.write(passwd.encode('utf-8') + b"\n") | |
self.tn.read_until(b"(config)> ") | |
def command(self, line): | |
line = line.encode('utf-8') | |
self.tn.write(line + b"\n") | |
result = self.tn.read_until(b"\r\n(config)> ") \ | |
.rstrip(b"\r\r\n(config)> ") \ | |
.strip(b'\x1b[K'.join(iterbytes(line)) + b"\r\n") \ | |
.replace(b"\r\n", b"\n") | |
return result.decode("utf-8") | |
def clean_routes(self, new_routes, gateway, iface): | |
prog = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(/\d{1,2})?\s+%s\s+%s\s+\d+\s*" % (gateway, iface)) | |
routes = [ list(filter(None, s.split(' '))) for s in self.command("show ip route").split('\n') if prog.match(s) is not None ] | |
count = 0 | |
print("### Cleaning {} routes from router table".format(len(routes)), file=sys.stderr) | |
for route in routes: | |
ip_route = IPv4Network(route[0]) | |
if ip_route in new_routes: | |
new_routes.remove(ip_route) | |
else: | |
result = self.command("no ip route %s %s" % (ip_route, route[1])) | |
print(result) | |
count += 1 | |
if count % 100 == 0: | |
print("{}/{} routes done.". format(count, len(routes)), file=sys.stderr) | |
def add_routes(self, new_routes, gateway, iface): | |
count = 0 | |
print("### Adding new {} routes to router table".format(len(new_routes)), file=sys.stderr) | |
for ip in sorted(new_routes): | |
result = self.command("ip route %s %s %s" % (ip, gateway, iface)) | |
print(result) | |
count += 1 | |
if count % 100 == 0: | |
print("{}/{} routes done.". format(count, len(new_routes)), file=sys.stderr) | |
def set_routes(self, new_routes, gateway, iface): | |
self.clean_routes(new_routes, gateway, iface) | |
self.add_routes(new_routes, gateway, iface) | |
def close(self): | |
self.tn.write(b"exit\n") | |
self.tn.read_all() | |
self.tn = None | |
def minimize_subnets(ips, limit): | |
from ipaddress import collapse_addresses | |
ips = list(collapse_addresses(ips)) | |
nets = list(ips) | |
# Fold | |
while len(nets) > limit: | |
prefix = max(n.prefixlen for n in nets) | |
sys.stdout.write("/{} ".format(prefix)) | |
sys.stdout.flush() | |
prefix_nets = [n for n in nets if n.prefixlen == prefix] | |
nets.extend(n.supernet() for n in prefix_nets) | |
nets = list(collapse_addresses(nets)) | |
print() | |
# Unfold | |
dnets = dict((n, []) for n in nets) | |
for ip in ips: | |
ip_net = ip | |
while ip_net.prefixlen > 0: | |
if ip_net in dnets: | |
dnets[ip_net].append(ip) | |
break | |
ip_net = ip_net.supernet() | |
assert ip_net.prefixlen > 0 | |
nets = set(nets) | |
for net, net_ips in sorted(dnets.items(), key=lambda x: len(x[1])): | |
if len(nets) + len(net_ips) > limit: | |
break | |
nets.remove(net) | |
nets.update(net_ips) | |
return set(nets) | |
# | |
# Usage: | |
# ./r.py <your_zyxel_password> | |
# | |
# more setting below: | |
# | |
if __name__ == "__main__": | |
# load blocked ips | |
ips = get_ips() | |
print("Got {} ip records.".format(len(ips)), file=sys.stderr) | |
# Add Tor subnet (optionally) | |
ips.add(IPv4Network("10.192.0.0/10")) | |
# create limited iptables ruleset | |
ips = minimize_subnets(ips, 2750) | |
print("Simplified them to {} records.".format(len(ips)), file=sys.stderr) | |
# update ruleset on the router | |
zyxel = Zyxel(ip="192.168.0.2", login="admin", passwd=sys.argv[1]) | |
zyxel.set_routes(ips, gateway="192.168.0.100", iface="Home") | |
zyxel.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment