Skip to content

Instantly share code, notes, and snippets.

@ties
Last active November 6, 2018 10:57
Show Gist options
  • Save ties/f2e195e9583b67d4c88ad385eb08c054 to your computer and use it in GitHub Desktop.
Save ties/f2e195e9583b67d4c88ad385eb08c054 to your computer and use it in GitHub Desktop.
"""
Quite quick, quite dirty search script for RIPTE db
"""
import aiohttp
import asyncio
import json
import itertools
import netaddr
from typing import List
from contextlib import asynccontextmanager
from typing import Optional
class RIPEDBClient(object):
session: aiohttp.ClientSession
base_url: str = 'http://rest.db.ripe.net/search'
def __init__(self, session: aiohttp.ClientSession):
self.session = session
@staticmethod
@asynccontextmanager
async def get_instance() -> 'RIPEDBClient':
headers = {
'Accept': 'application/json',
}
try:
async with aiohttp.ClientSession(headers=headers) as session:
yield RIPEDBClient(session)
finally:
await asyncio.sleep(0.250)
async def reverse_lookup_search(
self,
query_string: str=None,
inverse_attribute: str=None,
type_filter: Optional[str]=None):
params = {
'source': 'ripe',
'query-string': query_string,
'flags': 'no-referenced',
'inverse-attribute': inverse_attribute,
}
if type_filter:
params['type-filter'] = type_filter
async with self.session.get(self.base_url, params=params) as resp:
assert resp.status == 200
return await resp.json()
def network_from_ripe(entry) -> List[netaddr.IPRange]:
for attr in entry['attributes']['attribute']:
if attr['name'] == 'inetnum':
begin, end = attr['value'].split(' - ')
ip_range = netaddr.IPRange(begin, end)
return list(ip_range.cidrs())
return []
async def stats_for(rc, maintainer):
body = await rc.reverse_lookup_search(maintainer, 'mnt-by', 'inetnum')
networks = itertools.chain.from_iterable([network_from_ripe(entry) for entry in body['objects']['object']])
merged = netaddr.cidr_merge(networks)
ip_merged = netaddr.IPSet(merged)
# Print all IP sets
#print(ip_merged.iter_cidrs())
print("{:<20} {:>4} subnets {:>8} ips".format("{}:".format(maintainer), len(merged), len(ip_merged)))
async def main():
async with RIPEDBClient.get_instance() as rc:
await stats_for(rc, 'SN-LIR-MNT')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment