Last active
September 15, 2017 21:28
-
-
Save Riebart/45c7e02433ad903d0460341cc7111af6 to your computer and use it in GitHub Desktop.
Collect a list of IPv4 addresses into contiguous range blocks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Given a list of IP addresses (or IP address ranges), one per line (with ranges given as "IP1-IP2"), | |
return a list of IP address ranges that represent the smallest possible list of IP address ranges | |
that does not contain any addresses not represented in the input list. | |
Only supports IPv4. | |
""" | |
import sys | |
import struct | |
import json | |
def ip2int(ip): | |
""" | |
Convert an IPv4 address to a 32-bit unsigned interger. | |
""" | |
return struct.unpack(">L", | |
''.join([chr(int(c)) for c in ip.split(".")]))[0] | |
def parse_line(l): | |
""" | |
Given an input line, determine whether it is a single address or a range, and return either an | |
unsigned 32-bit integer, or a list of two unsigned 32-bit integers representing the start and | |
end of the range. | |
""" | |
l = l.replace(" ", "") | |
if "-" in l: | |
r = [ip2int(ip) for ip in l.split("-")] | |
if r[0] == r[1]: | |
return r[0] | |
else: | |
return r | |
else: | |
return ip2int(l) | |
def int2ip(ip): | |
""" | |
Given an IPv4 address represented as a 32-bit unsigned integer, return the dotted-decmal | |
representation. | |
""" | |
return ".".join([str(ord(c)) for c in struct.pack(">L", ip)]) | |
def merge_adjacent(ipr1, ipr2): | |
""" | |
Return the result of merging ipr1 and ipr2 if they are adjacent, and return return None if they | |
are not adjacent | |
Assumes that ipr1 starts before or at ipr2. | |
""" | |
if isinstance(ipr1, list) and isinstance(ipr2, list): | |
# List intersection only happens if the end of the first is immediately before (or later) | |
# the start of teh second list. The intersection ends at the max of the two intervals and | |
# starts where the first one starts. | |
# | |
# This handles when the the second interval is contained in the first. | |
if ipr1[1] >= ipr2[0] - 1: | |
return [[ipr1[0], max(ipr2[1], ipr1[1])]] | |
elif isinstance(ipr1, list) and isinstance(ipr2, int): | |
# If the first one is a list, it may contain the second point entirely or the second point | |
# may represent an extension of the end of the interval. | |
if ipr1[0] <= ipr2 <= ipr1[1]: | |
return [ipr1] | |
elif ipr1[1] == ipr2 - 1: | |
return [[ipr1[0], ipr2]] | |
elif isinstance(ipr1, int) and isinstance(ipr2, list): | |
# If the second is a list, the first may represent an extension of the beginning of the | |
# range | |
if ipr2[0] == ipr1 + 1: | |
return [[ipr1, ipr2[1]]] | |
elif isinstance(ipr1, int) and isinstance(ipr2, int): | |
# If they are both integers, then they need to be adjacent to be a list, otherwise they are | |
# compressed to a single point. | |
if ipr1 == ipr2: | |
return [ipr1] | |
elif ipr1 == ipr2 - 1: | |
return [[ipr1, ipr2]] | |
else: | |
return None | |
def __main(): | |
# Parse the input lines into ints | |
ips = [parse_line(l) for l in sys.stdin] | |
# Sort the input addresses so that they are ordered by address, using the start of the range for | |
# any input ranges. | |
ips.sort(key=lambda ip: ip[0] if isinstance(ip, list) else ip) | |
# Store any complete compressed ranges here | |
output_ranges = [] | |
# Store any currently active ranges (ranges that have been encountered, but where the end of the | |
# is still after the last address (or the start of a range) that we have processed. | |
active_ranges = [] | |
# If there's no IPs in the list, obviously just give up. | |
if len(ips) == 0: | |
print json.dumps(ips) | |
active_ranges.append(ips[0]) | |
for ip in ips[1:]: | |
new_ars = [] | |
merged = False | |
for ar in active_ranges: | |
# For each active range, determine if there is adjacency with the current IP | |
merge = merge_adjacent(ar, ip) | |
if merge is None: | |
new_ars.append(ar) | |
else: | |
new_ars += merge | |
merged = True | |
break | |
if not merged: | |
new_ars.append(ip) | |
active_ranges = sorted( | |
new_ars, key=lambda a: a[0] if isinstance(a, list) else a) | |
ranges = [ | |
"-".join([int2ip(ip) for ip in r]) | |
if isinstance(r, list) else int2ip(r) for r in active_ranges | |
] | |
print "\n".join(ranges) | |
def generate_ranges(nranges, range_max_size=768): | |
import random | |
ranges = [ | |
(lambda s=random.randint(0, 2**32-1-range_max_size): | |
[s, s+random.randint(0, range_max_size-1)])() for _ in xrange(nranges)] | |
print "\n".join([ | |
"-".join([int2ip(ip) for ip in r]) | |
if isinstance(r, list) else int2ip(r) for r in ranges | |
]) | |
if __name__ == "__main__": | |
__main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment