Created
January 23, 2026 04:40
-
-
Save utternerd/38e54496ad589ed0a7ae2158e0655d31 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import sys | |
| import ipaddress | |
| import logging | |
| from akamai.edgegrid import EdgeGridAuth, EdgeRc | |
| from urllib.parse import urljoin | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| # --- Configuration --- | |
| IP_LIST_URLS = ["https://example.com/list1.txt", "https://example.com/list2.txt"] | |
| NETWORK_LIST_ID = 'YOUR_LIST_ID' | |
| ENVIRONMENT = 'STAGING' # or 'PRODUCTION' | |
| NOTIFICATION_EMAIL = 'admin@example.com' | |
| # Sanity check thresholds | |
| MIN_EXPECTED_IPS = 10 # Minimum number of IPs expected | |
| MAX_CHANGE_PERCENT = 50 # Alert if more than 50% change | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Set up a retry strategy for network resilience | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| http = requests.Session() | |
| http.mount("https://", adapter) | |
| def is_valid_ip(ip_str): | |
| """Check if string is a valid IP or CIDR.""" | |
| try: | |
| if '/' in ip_str: | |
| ipaddress.ip_network(ip_str, strict=False) | |
| else: | |
| ipaddress.ip_address(ip_str) | |
| return True | |
| except ValueError: | |
| return False | |
| def get_remote_ips(): | |
| """Fetch and validate IPs from all configured sources.""" | |
| combined_ips = set() | |
| for url in IP_LIST_URLS: | |
| try: | |
| # timeout=10 prevents the script from hanging forever | |
| resp = http.get(url, timeout=10) | |
| resp.raise_for_status() | |
| lines = [line.strip() for line in resp.text.splitlines() if line.strip()] | |
| # Validate each line | |
| valid_lines = [l for l in lines if is_valid_ip(l)] | |
| if len(valid_lines) != len(lines): | |
| logger.warning(f"Ignored {len(lines) - len(valid_lines)} invalid IPs in {url}") | |
| combined_ips.update(valid_lines) | |
| logger.info(f"Fetched {len(valid_lines)} valid IPs from {url}") | |
| except requests.exceptions.RequestException as e: | |
| # CRITICAL: If any list fails to download, we stop everything | |
| # to prevent updating Akamai with a partial/incomplete list. | |
| logger.critical(f"Failed to fetch {url}: {e}") | |
| sys.exit(1) | |
| return combined_ips | |
| def main(): | |
| # Load Akamai Auth | |
| try: | |
| edgerc = EdgeRc('~/.edgerc') | |
| session = requests.Session() | |
| session.auth = EdgeGridAuth.from_edgerc(edgerc, 'default') | |
| baseurl = f"https://{edgerc.get('default', 'host')}" | |
| logger.info("Akamai authentication configured successfully") | |
| except Exception as e: | |
| logger.error(f"Auth Error: {e}") | |
| sys.exit(1) | |
| # 1. Fetch data from remote sources | |
| logger.info("Fetching IP lists from remote sources...") | |
| new_ips = get_remote_ips() | |
| logger.info(f"Total unique IPs retrieved: {len(new_ips)}") | |
| # Sanity check: Minimum IP count | |
| if len(new_ips) < MIN_EXPECTED_IPS: | |
| logger.error(f"Only {len(new_ips)} IPs retrieved (minimum expected: {MIN_EXPECTED_IPS})") | |
| logger.error("This may indicate a problem with the source lists. Aborting.") | |
| sys.exit(1) | |
| # 2. Get current state from Akamai | |
| logger.info(f"Fetching current network list from Akamai (ID: {NETWORK_LIST_ID})...") | |
| try: | |
| url = urljoin(baseurl, f"/network-list/v2/network-lists/{NETWORK_LIST_ID}") | |
| resp = session.get(url, params={"includeElements": "true"}) | |
| resp.raise_for_status() | |
| current_data = resp.json() | |
| current_ips = set(current_data.get('list', [])) | |
| logger.info(f"Current Akamai list contains {len(current_ips)} IPs") | |
| except Exception as e: | |
| logger.error(f"Failed to reach Akamai: {e}") | |
| sys.exit(1) | |
| # Sanity check: Detect large changes | |
| if current_ips: | |
| change_pct = abs(len(new_ips) - len(current_ips)) / len(current_ips) * 100 | |
| if change_pct > MAX_CHANGE_PERCENT: | |
| logger.warning(f"LARGE CHANGE DETECTED: {change_pct:.1f}% change ({len(current_ips)} → {len(new_ips)} IPs)") | |
| logger.warning("Proceeding anyway, but you may want to investigate this change.") | |
| # 3. Compare lists | |
| if new_ips == current_ips: | |
| logger.info("Lists match exactly. No action needed.") | |
| return | |
| # Show what's changing | |
| added = new_ips - current_ips | |
| removed = current_ips - new_ips | |
| logger.info(f"Changes detected:") | |
| logger.info(f" Adding {len(added)} IPs") | |
| logger.info(f" Removing {len(removed)} IPs") | |
| if added: | |
| sample_additions = list(added)[:5] | |
| logger.info(f" Sample additions: {sample_additions}") | |
| if removed: | |
| sample_removals = list(removed)[:5] | |
| logger.info(f" Sample removals: {sample_removals}") | |
| # 4. Update Akamai Network List | |
| logger.info(f"Updating Akamai network list with {len(new_ips)} IPs...") | |
| try: | |
| upd_url = urljoin(baseurl, f"/network-list/v2/network-lists/{NETWORK_LIST_ID}/elements") | |
| update_resp = session.put(upd_url, json={"list": list(new_ips)}) | |
| update_resp.raise_for_status() | |
| logger.info("Update request successful") | |
| # 5. Verify the update was applied correctly | |
| logger.info("Verifying update was applied...") | |
| verify_resp = session.get(url, params={"includeElements": "true"}) | |
| verify_resp.raise_for_status() | |
| updated_ips = set(verify_resp.json().get('list', [])) | |
| if updated_ips != new_ips: | |
| logger.error(f"Update verification FAILED!") | |
| logger.error(f"Expected {len(new_ips)} IPs, but Akamai has {len(updated_ips)} IPs") | |
| logger.error("Aborting activation to prevent propagating incorrect data") | |
| sys.exit(1) | |
| logger.info("Update verified successfully") | |
| # 6. Activate the changes | |
| logger.info(f"Activating network list to {ENVIRONMENT} environment...") | |
| act_url = urljoin(baseurl, f"/network-list/v2/network-lists/{NETWORK_LIST_ID}/environments/{ENVIRONMENT}/activate") | |
| activate_resp = session.post(act_url, json={ | |
| "notificationRecipients": [NOTIFICATION_EMAIL], | |
| "comments": "Automated sync from IP list sources" | |
| }) | |
| activate_resp.raise_for_status() | |
| logger.info(f"SUCCESS: Network list updated and activation initiated to {ENVIRONMENT}") | |
| except requests.exceptions.HTTPError as e: | |
| logger.error(f"Akamai API Error (HTTP {e.response.status_code}): {e.response.text}") | |
| logger.error(f"Request URL: {e.response.url}") | |
| sys.exit(1) | |
| except Exception as e: | |
| logger.error(f"Unexpected error during update/activation: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment