Skip to content

Instantly share code, notes, and snippets.

@jkoelker
Last active December 16, 2015 03:44
Show Gist options
  • Save jkoelker/fe106630e441e95e0a8b to your computer and use it in GitHub Desktop.
Save jkoelker/fe106630e441e95e0a8b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import netaddr
import socket
import subprocess
import tempfile
ASN_QUERY = '-i origin -T route %s\r\n'
def _whois(asn):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect(('whois.radb.net', 43))
s.send((ASN_QUERY % asn).encode('idna'))
response = b''
chunk = s.recv(4096)
while chunk:
response = response + chunk
chunk = s.recv(4096)
return response
def _fetch_routes(asn, summarize=True):
whois_data = _whois(asn)
routes = [netaddr.IPNetwork(l.split(b'route:')[-1].strip())
for l in whois_data.split(b'\n') if l[:6] == b'route:']
if summarize:
return netaddr.IPSet(routes)
return routes
def _fetch_ipset(asn, summarize=True):
try:
ipset_data = subprocess.check_output(('ipset', 'save', '%s' % asn))
except subprocess.CalledProcessError as e:
if e.returncode == 1:
args = ('ipset', 'create', '%s' % asn, 'hash:net')
subprocess.check_output(args)
return _fetch_ipset(asn)
return netaddr.IPSet()
prefix_match = 'add %s' % asn
prefix_len = len(prefix_match)
ipsets = [netaddr.IPNetwork(l.split()[-1])
for l in ipset_data.split('\n')
if l[:prefix_len] == prefix_match]
if summarize:
return netaddr.IPSet(ipsets)
return ipsets
def _update_ipset(asn, adds, deletes):
restore_data = ['add %s %s/%s' % (asn, a.network, a.prefixlen)
for a in adds.iter_cidrs()]
restore_data.extend(['del %s %s/%s' % (asn, d.network, d.prefixlen)
for d in deletes.iter_cidrs()])
with tempfile.NamedTemporaryFile() as f:
f.write('\n'.join(restore_data) + '\n')
f.flush()
args = ('ipset', 'restore', '-file', f.name)
ret = subprocess.check_call(args)
if ret == 0:
return True
return False
def _flush_delta(asn):
routes = _fetch_routes(asn)
ipsets = _fetch_ipset(asn)
adds = routes - ipsets
deletes = ipsets - routes & ipsets
return _update_ipset(asn, adds, deletes)
if __name__ == '__main__':
import sys
rets = [_flush_delta(asn) for asn in sys.argv[1:]]
if all(rets):
sys.exit(0)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment