Created
April 24, 2023 17:16
-
-
Save jsenecal/236dcabaa18a7c576ac54425b58d43e2 to your computer and use it in GitHub Desktop.
NETBOX REGRWS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# system | |
import os | |
from typing import Literal | |
from unidecode import unidecode | |
import googlemaps | |
# netbox | |
from dcim.models.sites import Site | |
# django | |
from django.conf import settings | |
from django.contrib.contenttypes.models import ContentType | |
from django.forms import PasswordInput | |
from extras.choices import CustomFieldTypeChoices, CustomFieldVisibilityChoices | |
from extras.models import CustomField | |
from extras.scripts import BooleanVar, MultiObjectVar, ObjectVar, Script, StringVar | |
from ipam.choices import PrefixStatusChoices | |
from ipam.models.ip import Aggregate, Prefix | |
# regrws | |
from regrws.api import Api | |
from regrws.api import constants as regrws_constants | |
from regrws.models import Customer, Error, Net | |
from regrws.models.nested import IPVersionEnum, Iso31661, MultiLineElement | |
from regrws.models.net import NetBlock | |
from regrws.models.tickets import TicketRequest | |
from tenancy.models.tenants import Tenant | |
DEFAULT_TENANT_PK = 53 | |
class ArinScript: | |
api_key = StringVar(widget=PasswordInput) | |
if settings.DEBUG: | |
api_url = StringVar( | |
default=regrws_constants.BASE_URL_DEFAULT, | |
) | |
debug = BooleanVar(description="If checked, the script will wait for a debugger to be attached to the worker") | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._regrws = None | |
self._countries = None | |
self._gmaps = None | |
self._commit = False | |
def get_markdown_for_obj(self, obj): | |
return f"[`{obj}`]({obj.get_absolute_url()})" | |
def find_net_from_prefix_or_aggregate(self, prefix_or_aggregate: Aggregate | Prefix): | |
prefix_or_aggregate_markdown = self.get_markdown_for_obj(prefix_or_aggregate) | |
if prefix_or_aggregate.prefix.prefixlen == 31: | |
arin_net_or_err: Net | Error = self._regrws.net.find_net( # type: ignore | |
prefix_or_aggregate.prefix.network, prefix_or_aggregate.prefix.network + 1 | |
) | |
else: | |
arin_net_or_err: Net | Error = self._regrws.net.find_net( # type: ignore | |
prefix_or_aggregate.prefix.network, prefix_or_aggregate.prefix.broadcast | |
) | |
found = False | |
if isinstance(arin_net_or_err, Error): | |
if arin_net_or_err.code != "E_OBJECT_NOT_FOUND": | |
self.log_failure(f"Error finding net for {prefix_or_aggregate_markdown}: {arin_net_or_err}") | |
elif arin_net_or_err is None: | |
self.log_warning(f"Unable to process ARIN's response for {prefix_or_aggregate_markdown}") | |
else: | |
found = True | |
return arin_net_or_err, found | |
class ReassignSimple(ArinScript, Script): | |
class Meta: # pylint: disable=too-few-public-methods | |
"""Meta class for setting Script Attributes""" | |
name = "Reassign Simple" | |
description = "Reassign prefixes assigned to tenants using the Reassign-Simple Method" | |
commit_default = False | |
tenant = ObjectVar(model=Tenant, required=True, default=DEFAULT_TENANT_PK) | |
aggregates = MultiObjectVar( | |
description="Run on prefixes within these aggregates", | |
model=Aggregate, | |
required=True, | |
query_params={"rir": "arin", "tenant_id": "$tenant"}, | |
default=Aggregate.objects.filter(rir__slug="arin", tenant_id=DEFAULT_TENANT_PK), | |
) | |
def process_prefix(self, prefix: Prefix, aggregate_net: Net): | |
prefix_markdown = self.get_markdown_for_obj(prefix) | |
self.log_info(f"Processing prefix {prefix_markdown}") | |
if prefix.tenant is None: | |
self.log_warning(f"Prefix {prefix_markdown} has no Tenant, skipping") | |
return | |
if prefix.site is None: | |
self.log_warning(f"Prefix {prefix_markdown} has no Site, skipping") | |
return | |
elif prefix.site.cf.get("gmaps_placeid") is None: | |
self.log_warning(f"Site {self.get_markdown_for_obj(prefix.site)} has no Google Maps PlaceID, skipping") | |
return | |
if prefix.prefix.version == 6 and prefix.prefix.prefixlen > 64: | |
self.log_warning( | |
f"[{prefix_markdown}] Network reassignments and reallocations in v6 must be /64 or greater, skipping" | |
) | |
return | |
arin_net, found = self.find_net_from_prefix_or_aggregate(prefix) | |
if found: | |
self.log_info("Found Net for : " + prefix_markdown) | |
assert isinstance(arin_net, Net) | |
arin_ticketrequest_net_or_err = self.process_net(arin_net, aggregate_net, prefix) | |
if isinstance(arin_ticketrequest_net_or_err, Net): | |
arin_net = arin_ticketrequest_net_or_err | |
elif isinstance(arin_ticketrequest_net_or_err, TicketRequest): | |
arin_net = arin_ticketrequest_net_or_err.net | |
else: | |
return | |
else: | |
self.log_info("No Net found for: " + prefix_markdown) | |
arin_ticketrequest_or_err = self.create_objects_from_prefix(prefix, aggregate_net) | |
if not isinstance(arin_ticketrequest_or_err, TicketRequest): | |
return | |
arin_net = arin_ticketrequest_or_err.net | |
assert isinstance(arin_net, Net) | |
prefix.custom_field_data["arin_net_handle"] = arin_net.handle | |
prefix.custom_field_data["arin_net_name"] = arin_net.net_name | |
prefix.custom_field_data["arin_customer_handle"] = arin_net.customer_handle | |
prefix.full_clean() | |
prefix.save() | |
def process_aggregate(self, aggregate): | |
self.log_info(f"Processing aggregate {self.get_markdown_for_obj(aggregate)}") | |
arin_net_or_err, found = self.find_net_from_prefix_or_aggregate(aggregate) | |
if not found: | |
self.log_failure(f"Unable to find net for {self.get_markdown_for_obj(aggregate)}") | |
return | |
prefixes_qs = aggregate.get_child_prefixes() | Prefix.objects.filter(prefix=aggregate.prefix) | |
prefixes_qs = prefixes_qs.filter(status=PrefixStatusChoices.STATUS_ACTIVE) | |
if arin_net_or_err is not None: | |
for prefix in prefixes_qs: | |
self.process_prefix(prefix, arin_net_or_err) # type: ignore | |
@staticmethod | |
def parse_gmaps_address_components(address_components) -> dict[str, str]: | |
components_dict = { | |
"street_number": ["street_number"], | |
"postal_code": ["postal_code"], | |
"street": ["street_address", "route"], | |
"region": [ | |
"administrative_area_level_1", | |
"administrative_area_level_2", | |
"administrative_area_level_3", | |
"administrative_area_level_4", | |
"administrative_area_level_5", | |
], | |
"city": [ | |
"locality", | |
"sublocality", | |
"sublocality_level_1", | |
"sublocality_level_2", | |
"sublocality_level_3", | |
"sublocality_level_4", | |
], | |
"country": ["country"], | |
} | |
shorts = ("region",) | |
parsed = {} | |
for component in address_components: | |
for key, values in components_dict.items(): | |
if any([value in component["types"] for value in values]): | |
if key in shorts: | |
parsed[key] = unidecode(component["short_name"]) | |
else: | |
parsed[key] = unidecode(component["long_name"]) | |
return parsed | |
def generate_customer(self, tenant: Tenant, site: Site): | |
site_placeid = site.cf.get("gmaps_placeid") | |
query = self._gmaps.place(site_placeid, language="en-US") # type: ignore | |
if query["status"] != "OK": | |
self.log_failure( | |
f"Unable to find Google Maps PlaceID {site_placeid} for {self.get_markdown_for_obj(site)}" | |
) | |
return None | |
# Parse the address components from Google Maps results | |
gmaps_address_dict = self.parse_gmaps_address_components(query["result"]["address_components"]) | |
# Check if we have a known Iso31661 for this site country | |
if not any([country in gmaps_address_dict["country"] for country in self.countries.keys()]): | |
self.log_failure( | |
f"Unable to find Iso31661 information for {self.get_markdown_for_obj(site)} ({site.physical_address})" | |
) | |
return None | |
else: | |
iso3166_1 = None | |
for country in self.countries.keys(): | |
if country in gmaps_address_dict["country"]: | |
iso3166_1 = self.countries[country] | |
break | |
if "street" not in gmaps_address_dict.keys(): | |
self.log_failure( | |
f"Unable to find street address for {self.get_markdown_for_obj(site)} ({site.physical_address})" | |
) | |
return None | |
lines = [ | |
f"{gmaps_address_dict['street_number']} {gmaps_address_dict['street']}" | |
if gmaps_address_dict.get("street_number") | |
else gmaps_address_dict["street"] | |
] | |
street_address = [MultiLineElement(number=idx + 1, line=lines[idx]) for idx in range(len(lines))] | |
comments = [MultiLineElement(number=1, line="MetroOptic Customer")] | |
return Customer( | |
customer_name=unidecode(tenant.name), | |
iso3166_1=iso3166_1, | |
street_address=street_address, | |
city=gmaps_address_dict["city"], | |
iso3166_2=gmaps_address_dict["region"], | |
postal_code=gmaps_address_dict["postal_code"], | |
private_customer=False, | |
comments=comments, | |
) | |
@staticmethod | |
def get_netblock_from_prefix( | |
prefix: Prefix, | |
type: Literal[ | |
"A", | |
"AF", | |
"AP", | |
"AR", | |
"AV", | |
"DA", | |
"FX", | |
"IR", | |
"IU", | |
"LN", | |
"LX", | |
"PV", | |
"PX", | |
"RD", | |
"RN", | |
"RV", | |
"RX", | |
"S", | |
] = "A", | |
) -> NetBlock: | |
return NetBlock( | |
start_address=prefix.prefix.network, end_address=None, cidr_length=prefix.prefix.prefixlen, type=type | |
) | |
@staticmethod | |
def get_net_name_from_prefix(prefix: Prefix): | |
prefix_str = str(prefix.prefix).replace(".", "-").replace("/", "-") | |
assert prefix.tenant is not None | |
tenant_str = prefix.tenant.slug.upper() | |
net_name = f"{tenant_str}-{prefix_str}" | |
return net_name | |
def generate_net( | |
self, prefix: Prefix, aggregate_net: Net, customer_handle: str | None = None, org_handle: str | None = None | |
): | |
version = IPVersionEnum(prefix.family) | |
net_name = self.get_net_name_from_prefix(prefix) | |
parent_net_handle = aggregate_net.handle | |
net_block = self.get_netblock_from_prefix(prefix, type="S") | |
return Net( | |
version=version, | |
net_name=net_name, | |
net_blocks=[net_block], | |
parent_net_handle=parent_net_handle, | |
customer_handle=customer_handle, | |
org_handle=org_handle, | |
poc_links=[], | |
) | |
def create_objects_from_prefix(self, prefix: Prefix, aggregate_net: Net): | |
prefix_markdown = self.get_markdown_for_obj(prefix) | |
# related objects | |
prefix_tenant: Tenant | None = prefix.tenant | |
prefix_site: Site | None = prefix.site | |
assert prefix_tenant is not None | |
assert prefix_site is not None | |
# customfields | |
org_handle = prefix_tenant.cf.get("arin_org_handle") | |
customer_handle = prefix.cf.get("arin_customer_handle") | |
# Create Customer if necessary | |
if org_handle is None: | |
if customer_handle is None: | |
self.log_info( | |
f"Tenant {self.get_markdown_for_obj(prefix_tenant)} has no Org Handle; Creating Customer for {prefix_markdown}" | |
) | |
customer_or_none = self.generate_customer(prefix_tenant, prefix_site) | |
if customer_or_none is None: | |
self.log_failure(f"Unable to generate customer for {prefix_markdown}") | |
return None | |
customer_dict = customer_or_none.dict() | |
if self._commit: | |
customer_or_err: Customer | Error | None = self._regrws.customer.create_for_net( # type: ignore | |
aggregate_net, **customer_dict | |
) | |
if isinstance(customer_or_err, Error): | |
self.log_failure(f"Error creating customer for {prefix_markdown}: {customer_or_err}") | |
return None | |
elif customer_or_err is None: | |
self.log_warning(f"Unable to process ARIN's response for {prefix_markdown}") | |
return None | |
self.log_success(f"Created Customer `{customer_or_err.handle}` for {prefix_markdown}") | |
customer_handle = customer_or_err.handle | |
prefix.custom_field_data["arin_customer_handle"] = customer_handle | |
else: | |
restart = False | |
customer_or_err = self._regrws.customer.from_handle(customer_handle) # type: ignore | |
if isinstance(customer_or_err, Error): | |
self.log_failure(f"Error getting customer for {prefix_markdown}: {customer_or_err}") | |
restart = True | |
elif customer_or_err is None: | |
self.log_warning(f"Unable to process ARIN's response for {prefix_markdown} ") | |
restart = True | |
if isinstance(customer_or_err, Customer): | |
self.log_success(f"Found Customer `{customer_or_err.handle}` for {prefix_markdown}") | |
if customer_or_err.customer_name != unidecode(prefix_tenant.name): | |
self.log_failure( | |
f"Customer `{customer_or_err.handle}` is not associated with {prefix_tenant.name}" | |
) | |
restart = True | |
if restart: | |
self.log_info(f"Customer handle is invalid for {prefix_markdown}") | |
prefix.custom_field_data["arin_customer_handle"] = "" | |
prefix.full_clean() | |
del prefix.cf | |
prefix.save() | |
return self.create_objects_from_prefix(prefix, aggregate_net) | |
# Create Net | |
self.log_info(f"Creating net for {prefix_markdown}") | |
net = self.generate_net(prefix, aggregate_net, customer_handle, org_handle) | |
if self._commit: | |
arin_ticketrequest_or_err = aggregate_net.reassign(net) # type: ignore | |
if isinstance(arin_ticketrequest_or_err, Error): | |
self.log_failure(f"Error creating net for {prefix_markdown}: {arin_ticketrequest_or_err}") | |
elif arin_ticketrequest_or_err is None: | |
self.log_warning(f"Unable to process ARIN's response for {prefix_markdown}") | |
else: | |
assert arin_ticketrequest_or_err.net is not None | |
self.log_success(f"Created net `{arin_ticketrequest_or_err.net.handle}` for {prefix_markdown}") | |
return arin_ticketrequest_or_err | |
def process_net(self, net: Net, aggregate_net: Net, prefix: Prefix): | |
self.log_info(f"Processing existing net `{net.handle} ({net.net_name})`") | |
arin_customer_handle = prefix.cf.get("arin_customer_handle") | |
prefix_tenant: Tenant | None = prefix.tenant | |
assert prefix_tenant is not None | |
arin_org_handle = prefix_tenant.cf.get("arin_org_handle") | |
assert net.net_blocks is not None | |
if (net.customer_handle is not None and net.customer_handle != arin_customer_handle) or ( | |
net.org_handle is not None and net.org_handle != arin_org_handle | |
): | |
if all([block.type == "A" for block in net.net_blocks]): | |
return self.create_objects_from_prefix(prefix, net) | |
if all([block.type == "S" for block in net.net_blocks]): | |
self.log_warning( | |
f"Net `{net.handle}` was reassigned to a different object, trying to remove arin reassignment" | |
) | |
net.remove() | |
return self.create_objects_from_prefix(prefix, aggregate_net) | |
self.log_failure(f"Net `{net.handle}` has not been reassigned, cannot remove safely. Skipping...") | |
return | |
self.log_success(f"Net `{net.handle}` is up to date") | |
return net | |
def create_arin_customfields(self): | |
prefix_ct = ContentType.objects.get_for_model(Prefix) | |
tenant_ct = ContentType.objects.get_for_model(Tenant) | |
arin_org_handle_cf, created = CustomField.objects.get_or_create( | |
name="arin_org_handle", | |
defaults=dict( | |
type=CustomFieldTypeChoices.TYPE_TEXT, | |
description="Represents a business, nonprofit corporation, or government entity in the ARIN database.", | |
label="Org Handle", | |
validation_regex="^[A-Z\\d-]+$", | |
required=False, | |
ui_visibility=CustomFieldVisibilityChoices.VISIBILITY_READ_WRITE, | |
), | |
) | |
arin_org_handle_cf.content_types.add(tenant_ct) | |
arin_org_handle_cf.save() | |
if created: | |
self.log_success(f"Created custom field {arin_org_handle_cf.name}") | |
arin_customer_handle_cf, created = CustomField.objects.get_or_create( | |
name="arin_customer_handle", | |
defaults=dict( | |
type=CustomFieldTypeChoices.TYPE_TEXT, | |
description="Customer handle for this Prefix/Tenant combination", | |
label="Customer Handle", | |
required=False, | |
ui_visibility=CustomFieldVisibilityChoices.VISIBILITY_HIDDEN, | |
validation_regex="^C[\\d]+$", | |
), | |
) | |
arin_customer_handle_cf.content_types.add(prefix_ct) | |
arin_customer_handle_cf.save() | |
if created: | |
self.log_success(f"Created custom field {arin_customer_handle_cf.name}") | |
arin_net_handle_cf, created = CustomField.objects.get_or_create( | |
name="arin_net_handle", | |
defaults=dict( | |
type=CustomFieldTypeChoices.TYPE_TEXT, | |
description="Handles for IPv4 networks start with NET-, and handles for IPv6 networks start with NET6-", | |
label="Net Handle", | |
required=False, | |
validation_regex="^NET6?-[\\dA-Z\\-]+$", | |
), | |
) | |
arin_net_handle_cf.content_types.add(prefix_ct) | |
arin_net_handle_cf.save() | |
if created: | |
self.log_success(f"Created custom field {arin_net_handle_cf.name}") | |
arin_net_name_cf, _ = CustomField.objects.get_or_create( | |
name="arin_net_name", | |
defaults=dict( | |
type=CustomFieldTypeChoices.TYPE_TEXT, | |
description="The name of the network", | |
label="Net Name", | |
required=False, | |
), | |
) | |
arin_net_name_cf.content_types.add(prefix_ct) | |
arin_net_name_cf.save() | |
if created: | |
self.log_success(f"Created custom field {arin_net_name_cf.name}") | |
@property | |
def countries(self) -> dict[str, Iso31661]: | |
if self._countries is None: | |
self._countries = { | |
"Canada": Iso31661(name="Canada", code2="CA", code3="CAN", e164=1), | |
"United States": Iso31661(name="United States of America", code2="US", code3="USA", e164=1), | |
} | |
return self._countries | |
def run(self, data, commit): | |
api_key = data["api_key"] | |
api_url = data.get("api_url", "https://reg.arin.net/") | |
aggregates = data.get("aggregates", Aggregate.objects.none()) | |
debug = data.get("debug") | |
self._gmaps = googlemaps.Client(key=os.environ.get("GMAPS_APIKEY")) | |
if debug: | |
import debugpy # pylint: disable=import-outside-toplevel | |
debugpy.listen(("0.0.0.0", 5678)) | |
debugpy.wait_for_client() # blocks execution until client is attached | |
self.create_arin_customfields() | |
self._regrws = Api(base_url=api_url, api_key=api_key) | |
self._commit = commit | |
for aggregate in aggregates: | |
self.process_aggregate(aggregate) | |
return f"Done, processed {aggregates.count()} aggregates" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment