Created
April 13, 2021 22:25
-
-
Save jathanism/3d0085fc2e919262c123f781593be157 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 3e282e34a0387ea80ff2500c936d107c369c08ba Mon Sep 17 00:00:00 2001 | |
From: jathanism <[email protected]> | |
Date: Tue, 13 Apr 2021 13:19:27 -0700 | |
Subject: [PATCH] Extend `IPAddress.objects.string_search` to support IPv6 | |
--- | |
nautobot/ipam/querysets.py | 83 +++++++++++++++++++++++---- | |
nautobot/ipam/tests/test_filters.py | 10 ++++ | |
nautobot/ipam/tests/test_querysets.py | 44 ++++++++++++++ | |
3 files changed, 126 insertions(+), 11 deletions(-) | |
diff --git a/nautobot/ipam/querysets.py b/nautobot/ipam/querysets.py | |
index a1a44399..d347b6d8 100644 | |
--- a/nautobot/ipam/querysets.py | |
+++ b/nautobot/ipam/querysets.py | |
@@ -1,3 +1,4 @@ | |
+import re | |
import uuid | |
import netaddr | |
@@ -169,6 +170,12 @@ class IPAddressQuerySet(RestrictedQuerySet): | |
6: IPV6_BYTE_LENGTH, | |
} | |
+ # Match string with ending in "::" | |
+ RE_COLON = re.compile(".*::$") | |
+ | |
+ # Match string from "0000" to "ffff" with no trailing ":" | |
+ RE_HEXTET = re.compile("^[a-f0-9]{4}$") | |
+ | |
@staticmethod | |
def _get_broadcast(network): | |
return network.broadcast if network.broadcast else network.network | |
@@ -203,22 +210,76 @@ class IPAddressQuerySet(RestrictedQuerySet): | |
Attempts to parse a (potentially incomplete) IPAddress and return an IPNetwork. | |
eg: '10.10' should be interpreted as netaddr.IPNetwork('10.10.0.0/16') | |
""" | |
+ version = 4 | |
+ | |
try: | |
- # disregard netmask | |
+ # Disregard netmask | |
search = search.split("/")[0] | |
- # get non-empty octets from search string | |
- octets = search.split(".") | |
- octets = list(filter(lambda o: o, octets)) | |
- prefix_len = 8 * len(octets) | |
- | |
- # create an netaddr.IPNetwork to search within | |
- octets.extend(["0" for _ in range(len(octets), 4)]) | |
- ip = f"{octets[0]}.{octets[1]}.{octets[2]}.{octets[3]}/{prefix_len}" | |
- return netaddr.IPNetwork(ip) | |
+ # Attempt to quickly assess v6 | |
+ if ":" in search: | |
+ version = 6 | |
+ | |
+ # (IPv6) If the value ends with ":" but it's not "::", make it so. | |
+ if search.endswith(":") and not self.RE_COLON.match(search): | |
+ search += ":" | |
+ # (IPv6) If the value is numeric and > 255, append "::" | |
+ # (IPv6) If the value is a hextet (e.g. "fe80"), append "::" | |
+ elif any( | |
+ [ | |
+ search.isdigit() and int(search) > 255, | |
+ self.RE_HEXTET.match(search), | |
+ ] | |
+ ): | |
+ search += "::" | |
+ version = 6 | |
+ | |
+ call_map = { | |
+ 4: self._parse_ipv4, | |
+ 6: self._parse_ipv6, | |
+ } | |
+ return call_map[version](search) | |
except netaddr.core.AddrFormatError: | |
- return netaddr.IPNetwork("0/32") | |
+ ver_map = {4: "0/32", 6: "::/128"} | |
+ return netaddr.IPNetwork(ver_map[version]) | |
+ | |
+ def _parse_ipv6(self, value): | |
+ """IPv6 addresses are 8, 16-bit fields.""" | |
+ | |
+ # Get non-empty octets from search string | |
+ hextets = value.split(":") | |
+ | |
+ # Before we normalize, check that final value is a digit. | |
+ if hextets[-1].isdigit(): | |
+ fill_zeroes = False # Leave "" in there. | |
+ prefix_len = 128 # Force /128 | |
+ else: | |
+ fill_zeroes = True # Replace "" w/ "0" | |
+ hextets = list(filter(lambda h: h, hextets)) | |
+ prefix_len = 16 * len(hextets) | |
+ | |
+ # Create an netaddr.IPNetwork to search within | |
+ if fill_zeroes: | |
+ hextets.extend(["0" for _ in range(len(hextets), 8)]) | |
+ | |
+ network = ":".join(hextets) | |
+ ip = f"{network}/{prefix_len}" | |
+ return netaddr.IPNetwork(ip) | |
+ | |
+ def _parse_ipv4(self, value): | |
+ """IPv4 addresses are 4, 8-bit fields.""" | |
+ | |
+ # Get non-empty octets from search string | |
+ octets = value.split(".") | |
+ octets = list(filter(lambda o: o, octets)) | |
+ prefix_len = 8 * len(octets) | |
+ | |
+ # Create an netaddr.IPNetwork to search within | |
+ octets.extend(["0" for _ in range(len(octets), 4)]) | |
+ network = ".".join(octets) | |
+ ip = f"{network}/{prefix_len}" | |
+ return netaddr.IPNetwork(ip) | |
def ip_family(self, family): | |
try: | |
diff --git a/nautobot/ipam/tests/test_filters.py b/nautobot/ipam/tests/test_filters.py | |
index 2f610515..92dfd741 100644 | |
--- a/nautobot/ipam/tests/test_filters.py | |
+++ b/nautobot/ipam/tests/test_filters.py | |
@@ -770,6 +770,16 @@ class IPAddressTestCase(TestCase): | |
"11.": 0, | |
"11.0": 0, | |
"10.10.10.0/24": 0, | |
+ "2001": 5, | |
+ "2001:": 5, | |
+ "2001::": 5, | |
+ "2001:db8:": 5, | |
+ "2001:db8::": 5, | |
+ "2001:db8::/64": 5, | |
+ "2001:db8::2": 1, | |
+ "2001:db8:0:2": 0, | |
+ "fe80": 0, | |
+ "fe80::": 0, | |
"foo.bar": 0, | |
} | |
diff --git a/nautobot/ipam/tests/test_querysets.py b/nautobot/ipam/tests/test_querysets.py | |
index eac9ac5c..8164c98c 100644 | |
--- a/nautobot/ipam/tests/test_querysets.py | |
+++ b/nautobot/ipam/tests/test_querysets.py | |
@@ -115,6 +115,50 @@ class IPAddressQuerySet(TestCase): | |
address = self.queryset.net_in(["10.0.0.1/24"])[0] | |
self.assertEqual(self.queryset.filter(address="10.0.0.1/24")[0], address) | |
+ def test_string_search_parse_as_network_string(self): | |
+ """ | |
+ Tests that the parsing underlying `string_search` behaves as expected. | |
+ """ | |
+ tests = { | |
+ "10": "10.0.0.0/8", | |
+ "10.": "10.0.0.0/8", | |
+ "10.0": "10.0.0.0/16", | |
+ "10.0.0.4": "10.0.0.4/32", | |
+ "10.0.0": "10.0.0.0/24", | |
+ "10.0.0.4/24": "10.0.0.4/32", | |
+ "10.0.0.4/24": "10.0.0.4/32", | |
+ "2001": "2001::/16", | |
+ "2001:": "2001::/16", | |
+ "2001::": "2001::/16", | |
+ "2001:db8:": "2001:db8::/32", | |
+ "2001:0db8::": "2001:db8::/32", | |
+ "2001:db8:abcd:0012::0/64": "2001:db8:abcd:12::/128", | |
+ "2001:db8::1/65": "2001:db8::1/128", | |
+ "fe80": "fe80::/16", | |
+ "fe80::": "fe80::/16", | |
+ "fe80::46b:a212:1132:3615": "fe80::46b:a212:1132:3615/128", | |
+ } | |
+ | |
+ for test, expected in tests.items(): | |
+ self.assertEqual(str(self.queryset._parse_as_network_string(test)), expected) | |
+ | |
+ def test_string_search(self): | |
+ search_terms = { | |
+ "10": 5, | |
+ "10.0.0.1": 2, | |
+ "10.0.0.1/24": 2, | |
+ "10.0.0.1/25": 2, | |
+ "10.0.0.2": 1, | |
+ "11": 0, | |
+ "2001": 3, | |
+ "2001::": 3, | |
+ "2001:db8::": 3, | |
+ "2001:db8::1": 1, | |
+ "fe80::": 0, | |
+ } | |
+ for term, cnt in search_terms.items(): | |
+ self.assertEqual(self.queryset.string_search(term).count(), cnt) | |
+ | |
class PrefixQuerysetTestCase(TestCase): | |
queryset = Prefix.objects.all() | |
-- | |
2.30.2 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment