Skip to content

Instantly share code, notes, and snippets.

@utternerd
Created January 23, 2026 04:40
Show Gist options
  • Select an option

  • Save utternerd/38e54496ad589ed0a7ae2158e0655d31 to your computer and use it in GitHub Desktop.

Select an option

Save utternerd/38e54496ad589ed0a7ae2158e0655d31 to your computer and use it in GitHub Desktop.
import requests
import sys
import ipaddress
import logging
from akamai.edgegrid import EdgeGridAuth, EdgeRc
from urllib.parse import urljoin
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# --- Configuration ---
IP_LIST_URLS = ["https://example.com/list1.txt", "https://example.com/list2.txt"]
NETWORK_LIST_ID = 'YOUR_LIST_ID'
ENVIRONMENT = 'STAGING' # or 'PRODUCTION'
NOTIFICATION_EMAIL = 'admin@example.com'
# Sanity check thresholds
MIN_EXPECTED_IPS = 10 # Minimum number of IPs expected
MAX_CHANGE_PERCENT = 50 # Alert if more than 50% change
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Set up a retry strategy for network resilience
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
def is_valid_ip(ip_str):
"""Check if string is a valid IP or CIDR."""
try:
if '/' in ip_str:
ipaddress.ip_network(ip_str, strict=False)
else:
ipaddress.ip_address(ip_str)
return True
except ValueError:
return False
def get_remote_ips():
"""Fetch and validate IPs from all configured sources."""
combined_ips = set()
for url in IP_LIST_URLS:
try:
# timeout=10 prevents the script from hanging forever
resp = http.get(url, timeout=10)
resp.raise_for_status()
lines = [line.strip() for line in resp.text.splitlines() if line.strip()]
# Validate each line
valid_lines = [l for l in lines if is_valid_ip(l)]
if len(valid_lines) != len(lines):
logger.warning(f"Ignored {len(lines) - len(valid_lines)} invalid IPs in {url}")
combined_ips.update(valid_lines)
logger.info(f"Fetched {len(valid_lines)} valid IPs from {url}")
except requests.exceptions.RequestException as e:
# CRITICAL: If any list fails to download, we stop everything
# to prevent updating Akamai with a partial/incomplete list.
logger.critical(f"Failed to fetch {url}: {e}")
sys.exit(1)
return combined_ips
def main():
# Load Akamai Auth
try:
edgerc = EdgeRc('~/.edgerc')
session = requests.Session()
session.auth = EdgeGridAuth.from_edgerc(edgerc, 'default')
baseurl = f"https://{edgerc.get('default', 'host')}"
logger.info("Akamai authentication configured successfully")
except Exception as e:
logger.error(f"Auth Error: {e}")
sys.exit(1)
# 1. Fetch data from remote sources
logger.info("Fetching IP lists from remote sources...")
new_ips = get_remote_ips()
logger.info(f"Total unique IPs retrieved: {len(new_ips)}")
# Sanity check: Minimum IP count
if len(new_ips) < MIN_EXPECTED_IPS:
logger.error(f"Only {len(new_ips)} IPs retrieved (minimum expected: {MIN_EXPECTED_IPS})")
logger.error("This may indicate a problem with the source lists. Aborting.")
sys.exit(1)
# 2. Get current state from Akamai
logger.info(f"Fetching current network list from Akamai (ID: {NETWORK_LIST_ID})...")
try:
url = urljoin(baseurl, f"/network-list/v2/network-lists/{NETWORK_LIST_ID}")
resp = session.get(url, params={"includeElements": "true"})
resp.raise_for_status()
current_data = resp.json()
current_ips = set(current_data.get('list', []))
logger.info(f"Current Akamai list contains {len(current_ips)} IPs")
except Exception as e:
logger.error(f"Failed to reach Akamai: {e}")
sys.exit(1)
# Sanity check: Detect large changes
if current_ips:
change_pct = abs(len(new_ips) - len(current_ips)) / len(current_ips) * 100
if change_pct > MAX_CHANGE_PERCENT:
logger.warning(f"LARGE CHANGE DETECTED: {change_pct:.1f}% change ({len(current_ips)} → {len(new_ips)} IPs)")
logger.warning("Proceeding anyway, but you may want to investigate this change.")
# 3. Compare lists
if new_ips == current_ips:
logger.info("Lists match exactly. No action needed.")
return
# Show what's changing
added = new_ips - current_ips
removed = current_ips - new_ips
logger.info(f"Changes detected:")
logger.info(f" Adding {len(added)} IPs")
logger.info(f" Removing {len(removed)} IPs")
if added:
sample_additions = list(added)[:5]
logger.info(f" Sample additions: {sample_additions}")
if removed:
sample_removals = list(removed)[:5]
logger.info(f" Sample removals: {sample_removals}")
# 4. Update Akamai Network List
logger.info(f"Updating Akamai network list with {len(new_ips)} IPs...")
try:
upd_url = urljoin(baseurl, f"/network-list/v2/network-lists/{NETWORK_LIST_ID}/elements")
update_resp = session.put(upd_url, json={"list": list(new_ips)})
update_resp.raise_for_status()
logger.info("Update request successful")
# 5. Verify the update was applied correctly
logger.info("Verifying update was applied...")
verify_resp = session.get(url, params={"includeElements": "true"})
verify_resp.raise_for_status()
updated_ips = set(verify_resp.json().get('list', []))
if updated_ips != new_ips:
logger.error(f"Update verification FAILED!")
logger.error(f"Expected {len(new_ips)} IPs, but Akamai has {len(updated_ips)} IPs")
logger.error("Aborting activation to prevent propagating incorrect data")
sys.exit(1)
logger.info("Update verified successfully")
# 6. Activate the changes
logger.info(f"Activating network list to {ENVIRONMENT} environment...")
act_url = urljoin(baseurl, f"/network-list/v2/network-lists/{NETWORK_LIST_ID}/environments/{ENVIRONMENT}/activate")
activate_resp = session.post(act_url, json={
"notificationRecipients": [NOTIFICATION_EMAIL],
"comments": "Automated sync from IP list sources"
})
activate_resp.raise_for_status()
logger.info(f"SUCCESS: Network list updated and activation initiated to {ENVIRONMENT}")
except requests.exceptions.HTTPError as e:
logger.error(f"Akamai API Error (HTTP {e.response.status_code}): {e.response.text}")
logger.error(f"Request URL: {e.response.url}")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error during update/activation: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment