Last active
December 16, 2015 03:44
-
-
Save jkoelker/fe106630e441e95e0a8b to your computer and use it in GitHub Desktop.
created by https://github.com/tr3buchet/gister
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import netaddr | |
import socket | |
import subprocess | |
import tempfile | |
ASN_QUERY = '-i origin -T route %s\r\n' | |
def _whois(asn): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.settimeout(10) | |
s.connect(('whois.radb.net', 43)) | |
s.send((ASN_QUERY % asn).encode('idna')) | |
response = b'' | |
chunk = s.recv(4096) | |
while chunk: | |
response = response + chunk | |
chunk = s.recv(4096) | |
return response | |
def _fetch_routes(asn, summarize=True): | |
whois_data = _whois(asn) | |
routes = [netaddr.IPNetwork(l.split(b'route:')[-1].strip()) | |
for l in whois_data.split(b'\n') if l[:6] == b'route:'] | |
if summarize: | |
return netaddr.IPSet(routes) | |
return routes | |
def _fetch_ipset(asn, summarize=True): | |
try: | |
ipset_data = subprocess.check_output(('ipset', 'save', '%s' % asn)) | |
except subprocess.CalledProcessError as e: | |
if e.returncode == 1: | |
args = ('ipset', 'create', '%s' % asn, 'hash:net') | |
subprocess.check_output(args) | |
return _fetch_ipset(asn) | |
return netaddr.IPSet() | |
prefix_match = 'add %s' % asn | |
prefix_len = len(prefix_match) | |
ipsets = [netaddr.IPNetwork(l.split()[-1]) | |
for l in ipset_data.split('\n') | |
if l[:prefix_len] == prefix_match] | |
if summarize: | |
return netaddr.IPSet(ipsets) | |
return ipsets | |
def _update_ipset(asn, adds, deletes): | |
restore_data = ['add %s %s/%s' % (asn, a.network, a.prefixlen) | |
for a in adds.iter_cidrs()] | |
restore_data.extend(['del %s %s/%s' % (asn, d.network, d.prefixlen) | |
for d in deletes.iter_cidrs()]) | |
with tempfile.NamedTemporaryFile() as f: | |
f.write('\n'.join(restore_data) + '\n') | |
f.flush() | |
args = ('ipset', 'restore', '-file', f.name) | |
ret = subprocess.check_call(args) | |
if ret == 0: | |
return True | |
return False | |
def _flush_delta(asn): | |
routes = _fetch_routes(asn) | |
ipsets = _fetch_ipset(asn) | |
adds = routes - ipsets | |
deletes = ipsets - routes & ipsets | |
return _update_ipset(asn, adds, deletes) | |
if __name__ == '__main__': | |
import sys | |
rets = [_flush_delta(asn) for asn in sys.argv[1:]] | |
if all(rets): | |
sys.exit(0) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment