-
-
Save snehesht/29247f737f983fe24ecffe0f9188b3d2 to your computer and use it in GitHub Desktop.
make afrinic/apnic/arin/ripe/lacnic delegated-latest ipv4 and ipv6 list script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim:fileencoding=utf-8 | |
""" [NAME] script or package easy description | |
[DESCRIPTION] script or package description | |
""" | |
from datetime import datetime | |
from argparse import ArgumentParser | |
import pprint | |
import time | |
import warnings | |
import os, sys, io | |
import signal | |
import urllib | |
import urllib.request | |
import tempfile | |
import os.path | |
import re | |
import ipaddress | |
import traceback | |
from subprocess import Popen, PIPE, STDOUT | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from multiprocessing import cpu_count | |
__author__ = 'holly' | |
__version__ = '1.0' | |
DESCRIPTION = 'this is description' | |
DELEGATED_LATEST_MAP = { | |
'afrinic' : 'http://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-extended-latest', | |
'apnic' : 'http://ftp.apnic.net/pub/stats/apnic/delegated-apnic-extended-latest', | |
'arin' : 'http://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest', | |
'ripe' : 'http://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest', | |
'lacnic' : 'http://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest', | |
} | |
DOWNLOAD_FILE_CACHE = 86400 # 1day | |
parser = ArgumentParser(description=DESCRIPTION) | |
parser.add_argument('--version', action='version', version='%(prog)s ' + __version__) | |
parser.add_argument('--registry', '-r', action='store', nargs='*', default=sorted(DELEGATED_LATEST_MAP.keys()), help='delegated registries(afrinic|apnic|arin|ripe|lacnic)') | |
parser.add_argument('--output-dir', '-d', action='store', default=os.getcwd(), help='new file output directory') | |
parser.add_argument('--force', '-f', action='store_true', help='force execution') | |
args = parser.parse_args() | |
class NetworkData(object): | |
SPLIT_SEP = '\|' | |
LINE_FORMAT = "{cc}\t{start}/{cidr}\t{start}/{mask}" | |
NETWORK_CIDR_MAP = { | |
# allocated : cidr prefix : mask | |
"2147483648" : { "cidr": "1", "mask": "128.0.0.0" }, | |
"1073741824" : { "cidr": "2", "mask": "192.0.0.0" }, | |
"536870912" : { "cidr": "3", "mask": "224.0.0.0" }, | |
"268435456" : { "cidr": "4", "mask": "240.0.0.0" }, | |
"134217728" : { "cidr": "5", "mask": "248.0.0.0" }, | |
"67108864" : { "cidr": "6", "mask": "252.0.0.0" }, | |
"33554432" : { "cidr": "7", "mask": "254.0.0.0" }, | |
"16777216" : { "cidr": "8", "mask": "255.0.0.0" }, | |
"8388608" : { "cidr": "9", "mask": "255.128.0.0" }, | |
"4194304" : { "cidr": "10", "mask": "255.192.0.0" }, | |
"2097152" : { "cidr": "11", "mask": "255.224.0.0" }, | |
"1048576" : { "cidr": "12", "mask": "255.240.0.0" }, | |
"524288" : { "cidr": '13', "mask": "255.248.0.0" }, | |
"262144" : { "cidr": '14', "mask": "255.252.0.0" }, | |
"131072" : { "cidr": '15', "mask": "255.254.0.0" }, | |
"65536" : { "cidr": '16', "mask": "255.255.0.0" }, | |
"32768" : { "cidr": '17', "mask": "255.255.128.0" }, | |
"16384" : { "cidr": '18', "mask": "255.255.192.0" }, | |
"8192" : { "cidr": '19', "mask": "255.255.224.0" }, | |
"4096" : { "cidr": '20', "mask": "255.255.240.0" }, | |
"2048" : { "cidr": '21', "mask": "255.255.248.0" }, | |
"1024" : { "cidr": '22', "mask": "255.255.252.0" }, | |
"512" : { "cidr": '23', "mask": "255.255.254.0" }, | |
"256" : { "cidr": '24', "mask": "255.255.255.0" }, | |
"128" : { "cidr": '25', "mask": "255.255.255.128" }, | |
"64" : { "cidr": '26', "mask": "255.255.255.192" }, | |
"32" : { "cidr": '27', "mask": "255.255.255.224" }, | |
"16" : { "cidr": '28', "mask": "255.255.255.240" }, | |
"8" : { "cidr": '29', "mask": "255.255.255.248" }, | |
"4" : { "cidr": '30', "mask": "255.255.255.252" }, | |
"2" : { "cidr": '31', "mask": "255.255.255.254" }, | |
"1" : { "cidr": '32', "mask": "255.255.255.255" }, | |
} | |
def __init__(self, line): | |
self.__line = line | |
self.__rir = None | |
self.__cc = None | |
self.__iptype = None | |
self.__start = None | |
self.__value = None | |
self.__status = None | |
self.__mask = None | |
self.__lines = [] | |
# make line string | |
self.__initialize() | |
def analyzed(self): | |
return True if len(self.__lines) > 0 else False | |
def make_lines(self, iptype, cc, start, value): | |
lines = [] | |
format_dict = { "cc": cc, "start": start, "mask": value, "cidr": value } | |
if iptype == "ipv6": | |
print("[iptype:ipv6 cc:{0} start:{1} allocated:{2} range:{1}/{2}] valid allocated. OK".format(cc, start, value)) | |
lines.append(self.__class__.LINE_FORMAT.format(**format_dict)) | |
elif iptype == "ipv4": | |
# for ipv4 | |
subnet_map = self.__class__.NETWORK_CIDR_MAP.get(value) | |
if subnet_map: | |
print("[iptype:ipv4 cc:{0} start:{1} allocated:{2} range:{1}/{3}] valid allocated. OK".format(cc, start, value, subnet_map["cidr"])) | |
format_dict["cidr"] = subnet_map["cidr"] | |
format_dict["mask"] = subnet_map["mask"] | |
lines.append(self.__class__.LINE_FORMAT.format(**format_dict)) | |
if len(lines) == 0: | |
# for ipv4 invalid allocated | |
print("[iptype:ipv4 cc:{0} start:{1} allocated:{2}] invalid allocated... start adhoc calclation".format(cc, start, value)) | |
allocated = int(value) | |
for check_allocated in sorted(self.__class__.NETWORK_CIDR_MAP.keys(), key=int, reverse=True): | |
if allocated < int(check_allocated): | |
before_check_allocated = check_allocated | |
continue | |
try: | |
subnet_map = self.__class__.NETWORK_CIDR_MAP.get(check_allocated) | |
network_str = "{0}/{1}".format(start, subnet_map["cidr"]) | |
print(" trying {0}...".format(network_str)) | |
network = ipaddress.ip_network(network_str) | |
rest_allocated = allocated - int(check_allocated) | |
except ValueError as e: | |
print(type(e), str(e), "challenge next cidr") | |
continue | |
else: | |
format_dict["cidr"] = subnet_map["cidr"] | |
format_dict["mask"] = subnet_map["mask"] | |
lines.append(self.__class__.LINE_FORMAT.format(**format_dict)) | |
next_start = network[-1] + 1 | |
print(" calclate network segment", network) | |
print(" cc:{0} next network start value:{1} rest_allocated:{2}".format(cc, next_start, rest_allocated)) | |
if rest_allocated > 0: | |
print(" -> [iptype:ipv4 cc:{0} start:{1} allocated:{2}] start recursive make_lines method".format(cc, next_start, str(rest_allocated))) | |
lines.extend(self.make_lines(iptype, cc, next_start, str(rest_allocated))) | |
break | |
return lines | |
@property | |
def iptype(self): | |
return self.__iptype | |
def __initialize(self): | |
line = self.__line.strip() | |
if re.match("^#", line): | |
return | |
fields = re.split(self.__class__.SPLIT_SEP, line) | |
if len(fields) < 7: | |
return | |
rir = fields[0] | |
cc = fields[1] | |
iptype = fields[2] | |
start = fields[3] | |
value = fields[4] | |
status = fields[6] | |
if cc == "*" or cc == "": | |
return | |
if (iptype == "ipv4" or iptype == "ipv6") and (status == "assigned" or status == "allocated"): | |
pass | |
else: | |
return | |
lines = self.make_lines(iptype, cc, start, value) | |
# set property | |
self.__rir = rir | |
self.__cc = cc | |
self.__iptype = iptype | |
self.__start = start | |
self.__value = value | |
self.__status = status | |
self.__lines.extend(lines) | |
def __str__(self): | |
if not self.analyzed(): | |
return "" | |
return "\n".join(self.__lines) + "\n" | |
def is_download_real_file(download_file): | |
if args.force: | |
return True | |
if not os.path.exists(download_file): | |
return True | |
current_time = time.time() | |
latest_mtime = os.path.getmtime(download_file) | |
if (current_time - latest_mtime) > DOWNLOAD_FILE_CACHE: | |
return True | |
else: | |
return False | |
def download_delegated_latest(delegated_latest_url, download_file): | |
req = urllib.request.Request(delegated_latest_url) | |
with urllib.request.urlopen(req) as res: | |
with open(download_file, "w") as f: | |
for line in res: | |
print(line.decode("utf-8").strip(), file=f) | |
def make_list(rir, delegated_latest_url): | |
if not rir in args.registry: | |
return | |
print("{0}: make_list start".format(rir)) | |
popen_args = ["sort", "-k", "1,1", "-k", "2n"] | |
download_file = os.path.join(tempfile.gettempdir(), rir) + ".txt.tmp" | |
if is_download_real_file(download_file): | |
print("{0}: {1} download start".format(rir, delegated_latest_url)) | |
download_delegated_latest(delegated_latest_url, download_file) | |
print("{0}: {1} download end".format(rir, delegated_latest_url)) | |
else: | |
print("{0}: download skip. exit".format(rir)) | |
return | |
new_ipv4_file = os.path.join(args.output_dir, "{0}-ipv4-latest.txt".format(rir)) | |
new_ipv6_file = os.path.join(args.output_dir, "{0}-ipv6-latest.txt".format(rir)) | |
map = { | |
'ipv4': { 'new_file': new_ipv4_file }, | |
'ipv6': { 'new_file': new_ipv6_file }, | |
} | |
for key in map.keys(): | |
map[key]['f'] = open(map[key]['new_file'], "w") | |
map[key]['proc'] = Popen(popen_args, stdin=PIPE, stdout=map[key]['f'], stderr=STDOUT, universal_newlines=True) | |
print("{0}: create new file start({1})".format(rir, map[key]['new_file'])) | |
with open(download_file, "r") as f: | |
for line in f: | |
# afrinic|KE|ipv4|213.150.96.0|8192|20100301|allocated | |
# arin|US|ipv6|2001:400::|32|19990803|allocated|04f048163e37eef48d891498545eefc0 | |
data = NetworkData(line) | |
if not data.analyzed(): | |
continue | |
map[data.iptype]['proc'].stdin.write(str(data)) | |
for key in map.keys(): | |
map[key]['f'].close() | |
map[key]['proc'].communicate() | |
print("{0}: create new file end({1})".format(rir, map[key]['new_file'])) | |
print("{0}: make_list end".format(rir)) | |
def main(): | |
""" [FUNCTIONS] method or functon description | |
""" | |
with ThreadPoolExecutor(max_workers=cpu_count()) as executor: | |
futures = { executor.submit(make_list, rir, delegated_latest_url): rir for rir, delegated_latest_url in DELEGATED_LATEST_MAP.items() } | |
for future in as_completed(futures): | |
rir = futures[future] | |
try: | |
result = future.result() | |
except urllib.error.HTTPError as e: | |
print('{0}: http error'.format(rir)) | |
print(e.read().decode("utf-8").strip()) | |
except Exception as e: | |
print(type(e)) | |
print(traceback.format_exc()) | |
print('{0}: generated an exception: {1}'.format(rir, e)) | |
else: | |
print('{0}: completed'.format(rir)) | |
sys.exit(0) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment