|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import requests |
|
import json |
|
import sys |
|
import time |
|
from datetime import datetime |
|
|
|
# Import Rich components at module level |
|
try: |
|
from rich.console import Console |
|
from rich.table import Table |
|
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn, TaskProgressColumn |
|
from rich.panel import Panel |
|
from rich.text import Text |
|
from rich import box |
|
from rich.rule import Rule |
|
RICH_AVAILABLE = True |
|
except ImportError: |
|
print("For a better visual experience, install rich: pip install rich") |
|
RICH_AVAILABLE = False |
|
|
|
# Initialize rich console if available |
|
console = Console() if RICH_AVAILABLE else None |
|
|
|
def log(message, level="info", use_rich=True): |
|
"""Log messages with rich formatting if available""" |
|
if not RICH_AVAILABLE or not use_rich: |
|
print(message) |
|
return |
|
|
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
|
|
if level == "info": |
|
console.print(f"[grey]{timestamp}[/grey] [blue]INFO[/blue]: {message}") |
|
elif level == "success": |
|
console.print(f"[grey]{timestamp}[/grey] [green]SUCCESS[/green]: {message}") |
|
elif level == "warning": |
|
console.print(f"[grey]{timestamp}[/grey] [yellow]WARNING[/yellow]: {message}") |
|
elif level == "error": |
|
console.print(f"[grey]{timestamp}[/grey] [red]ERROR[/red]: {message}") |
|
elif level == "debug": |
|
console.print(f"[grey]{timestamp}[/grey] [magenta]DEBUG[/magenta]: {message}") |
|
elif level == "header": |
|
console.print() |
|
console.print(Panel(Text(message, justify="center"), border_style="blue")) |
|
elif level == "subheader": |
|
console.print(Text(f"\n{message}", style="blue bold")) |
|
|
|
def display_zones_table(zones): |
|
"""Display zones in a nicely formatted table""" |
|
if not RICH_AVAILABLE or not zones: |
|
return |
|
|
|
table = Table(title="Cloudflare Zones", box=box.ROUNDED) |
|
table.add_column("Zone Name", style="cyan") |
|
table.add_column("Zone ID", style="magenta") |
|
table.add_column("Status", style="green") |
|
|
|
for zone in zones: |
|
table.add_row( |
|
zone.get('name', 'N/A'), |
|
zone.get('id', 'N/A'), |
|
zone.get('status', 'N/A') |
|
) |
|
|
|
console.print(table) |
|
|
|
def display_rule_comparison(zone_name, old_expression, new_expression): |
|
"""Display a comparison between old and new expressions""" |
|
if not RICH_AVAILABLE: |
|
return |
|
|
|
console.print(Panel( |
|
f"[bold cyan]Zone:[/bold cyan] {zone_name}\n\n" |
|
f"[bold yellow]Current Expression:[/bold yellow]\n{old_expression}\n\n" |
|
f"[bold green]New Expression:[/bold green]\n{new_expression}", |
|
title="WAF Rule Update Preview", |
|
border_style="blue" |
|
)) |
|
|
|
def parse_arguments(): |
|
parser = argparse.ArgumentParser(description='Update Cloudflare WAF Custom Rules across multiple zones') |
|
parser.add_argument('--token', required=True, help='Cloudflare API Token with appropriate permissions') |
|
parser.add_argument('--description', default='Block Attacks', help='Description of the rule to update') |
|
parser.add_argument('--expression', default='(http.request.uri.path contains ".env") or (http.request.uri.path contains "wlwmanifest.xml") or (http.request.uri.path contains "xmlrpc.php") or (http.request.uri.path contains "setup-config.php") or (http.request.uri.path contains "wp-login.php")', |
|
help='WAF expression to update (default is to block common attack patterns)') |
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be updated without making changes') |
|
parser.add_argument('--force-recreate', action='store_true', help='Force delete and recreate instead of attempting to update') |
|
parser.add_argument('--create-if-missing', action='store_true', help='Create the rule if it doesn\'t exist') |
|
parser.add_argument('--zone', help='Specific zone name to update (updates all zones if not specified)') |
|
parser.add_argument('--debug', action='store_true', help='Enable debug output for troubleshooting') |
|
parser.add_argument('--api-key', help='Use Global API Key instead of API Token (less secure, but may be needed)') |
|
parser.add_argument('--email', help='Cloudflare account email, required if using API Key') |
|
return parser.parse_args() |
|
|
|
def get_zones(token, target_zone=None, debug=False): |
|
headers = { |
|
"Authorization": f"Bearer {token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
url = "https://api.cloudflare.com/client/v4/zones" |
|
|
|
if debug: |
|
log(f"Making request to {url}", level="debug") |
|
# We don't log the actual headers with token for security |
|
log(f"Using Bearer token authentication", level="debug") |
|
|
|
try: |
|
response = requests.get(url, headers=headers) |
|
|
|
if debug: |
|
log(f"Response status code: {response.status_code}", level="debug") |
|
log(f"Response content type: {response.headers.get('Content-Type', 'N/A')}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error fetching zones: {response.status_code}", level="error") |
|
log(f"Response: {response.text}", level="error") |
|
if response.status_code == 401: |
|
log("Authentication failed. Please check your API token.", level="error") |
|
log("Make sure the token has 'Zone:Read' permissions.", level="error") |
|
elif response.status_code == 403: |
|
log("Authorization failed. Your token does not have permission to list zones.", level="error") |
|
sys.exit(1) |
|
|
|
data = response.json() |
|
|
|
if debug and RICH_AVAILABLE: |
|
console.print("[magenta]DEBUG: API response summary:[/magenta]") |
|
console.print(f"Success: {data.get('success')}") |
|
console.print(f"Result count: {len(data.get('result', []))}") |
|
console.print(f"Errors count: {len(data.get('errors', []))}") |
|
|
|
if not data.get('success', False): |
|
log(f"API returned an error: {data.get('errors', 'Unknown error')}", level="error") |
|
log(f"Message: {data.get('messages', 'No message provided')}", level="error") |
|
sys.exit(1) |
|
|
|
zones = data.get('result', []) |
|
|
|
if debug and not zones: |
|
log("No zones were returned by the API.", level="debug") |
|
log("This could mean your account has no zones or your token lacks permission.", level="debug") |
|
|
|
# Filter for a specific zone if requested |
|
if target_zone: |
|
zones = [zone for zone in zones if zone['name'] == target_zone] |
|
if not zones: |
|
log(f"Zone '{target_zone}' not found", level="error") |
|
sys.exit(1) |
|
|
|
return zones |
|
|
|
except requests.exceptions.RequestException as e: |
|
log(f"Network error when connecting to Cloudflare API: {e}", level="error") |
|
sys.exit(1) |
|
except json.JSONDecodeError: |
|
log("Invalid JSON response from Cloudflare API.", level="error") |
|
if debug: |
|
log(f"Raw response: {response.text}", level="debug") |
|
sys.exit(1) |
|
|
|
def get_zones_with_api_key(api_key, email, target_zone=None, debug=False): |
|
headers = { |
|
"X-Auth-Email": email, |
|
"X-Auth-Key": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
url = "https://api.cloudflare.com/client/v4/zones" |
|
|
|
if debug: |
|
log(f"Making request to {url} using Global API Key", level="debug") |
|
|
|
try: |
|
response = requests.get(url, headers=headers) |
|
|
|
if debug: |
|
log(f"Response status code: {response.status_code}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error fetching zones: {response.status_code}", level="error") |
|
log(f"Response: {response.text}", level="error") |
|
if response.status_code == 401: |
|
log("Authentication failed. Please check your API Key and Email.", level="error") |
|
elif response.status_code == 403: |
|
log("Authorization failed. Your API Key does not have permission to list zones.", level="error") |
|
sys.exit(1) |
|
|
|
data = response.json() |
|
|
|
if debug and not data.get('result', []): |
|
log("DEBUG: API returned a response but no zones were found.", level="debug") |
|
log(f"DEBUG: Success status: {data.get('success')}", level="debug") |
|
|
|
if not data.get('success', False): |
|
log(f"API returned an error: {data.get('errors', 'Unknown error')}", level="error") |
|
sys.exit(1) |
|
|
|
zones = data.get('result', []) |
|
|
|
# Filter for a specific zone if requested |
|
if target_zone: |
|
zones = [zone for zone in zones if zone['name'] == target_zone] |
|
if not zones: |
|
log(f"Zone '{target_zone}' not found", level="error") |
|
sys.exit(1) |
|
|
|
return zones |
|
|
|
except requests.exceptions.RequestException as e: |
|
log(f"Network error when connecting to Cloudflare API: {e}", level="error") |
|
sys.exit(1) |
|
except json.JSONDecodeError: |
|
log("Invalid JSON response from Cloudflare API.", level="error") |
|
if debug: |
|
log(f"DEBUG: Raw response: {response.text}", level="debug") |
|
sys.exit(1) |
|
|
|
def get_waf_rulesets(token, zone_id, debug=False): |
|
headers = { |
|
"Authorization": f"Bearer {token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/phases/http_request_firewall_custom/entrypoint" |
|
|
|
if debug: |
|
log(f"Making request to {url}", level="debug") |
|
|
|
try: |
|
response = requests.get(url, headers=headers) |
|
|
|
if debug: |
|
log(f"Response status: {response.status_code}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error fetching WAF rulesets for zone {zone_id}: {response.status_code}", level="error") |
|
log(response.text, level="error") |
|
return None |
|
|
|
data = response.json() |
|
if not data.get('success'): |
|
log(f"API returned an error for zone {zone_id}: {data.get('errors')}", level="error") |
|
return None |
|
|
|
if debug and 'result' in data: |
|
if RICH_AVAILABLE: |
|
ruleset = data['result'] |
|
if 'rules' in ruleset: |
|
log(f"Found {len(ruleset['rules'])} rules in the ruleset", level="debug") |
|
# DEBUG: Log the descriptions of all rules |
|
for idx, rule in enumerate(ruleset['rules']): |
|
log(f"Rule {idx+1} description: '{rule.get('description', 'NO DESCRIPTION')}'", level="debug") |
|
else: |
|
log("Ruleset contains no rules", level="debug") |
|
|
|
return data.get('result') |
|
|
|
except requests.exceptions.RequestException as e: |
|
log(f"Network error when fetching WAF rulesets: {e}", level="error") |
|
return None |
|
except json.JSONDecodeError: |
|
log("Invalid JSON response when fetching WAF rulesets", level="error") |
|
return None |
|
|
|
def get_waf_rulesets_with_api_key(api_key, email, zone_id, debug=False): |
|
"""Get WAF rulesets using the Global API Key authentication""" |
|
headers = { |
|
"X-Auth-Email": email, |
|
"X-Auth-Key": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/phases/http_request_firewall_custom/entrypoint" |
|
|
|
if debug: |
|
log(f"Making request to {url} using Global API Key", level="debug") |
|
|
|
try: |
|
response = requests.get(url, headers=headers) |
|
|
|
if debug: |
|
log(f"Response status: {response.status_code}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error fetching WAF rulesets for zone {zone_id}: {response.status_code}", level="error") |
|
log(response.text, level="error") |
|
return None |
|
|
|
data = response.json() |
|
if not data.get('success'): |
|
log(f"API returned an error for zone {zone_id}: {data.get('errors')}", level="error") |
|
return None |
|
|
|
if debug and 'result' in data: |
|
if RICH_AVAILABLE: |
|
ruleset = data['result'] |
|
if 'rules' in ruleset: |
|
log(f"Found {len(ruleset['rules'])} rules in the ruleset", level="debug") |
|
# DEBUG: Log the descriptions of all rules |
|
for idx, rule in enumerate(ruleset['rules']): |
|
log(f"Rule {idx+1} description: '{rule.get('description', 'NO DESCRIPTION')}'", level="debug") |
|
else: |
|
log("Ruleset contains no rules", level="debug") |
|
|
|
return data.get('result') |
|
|
|
except requests.exceptions.RequestException as e: |
|
log(f"Network error when fetching WAF rulesets: {e}", level="error") |
|
return None |
|
except json.JSONDecodeError: |
|
log("Invalid JSON response when fetching WAF rulesets", level="error") |
|
return None |
|
|
|
def find_rule_by_description(ruleset_data, description): |
|
if not ruleset_data or 'rules' not in ruleset_data: |
|
return None |
|
|
|
for rule in ruleset_data.get('rules', []): |
|
if rule.get('description') == description: |
|
return rule |
|
|
|
return None |
|
|
|
def update_rule(auth_headers, zone_id, ruleset_id, rule, description, expression, force_recreate=False, dry_run=False, debug=False): |
|
"""Update a rule using provided authentication headers""" |
|
rule_id = rule['id'] |
|
action = rule.get('action', 'block') |
|
|
|
if debug: |
|
log(f"Updating rule with description: '{description}'", level="debug") |
|
log(f"Current rule data: {json.dumps(rule)}", level="debug") |
|
|
|
# Try to update using PATCH first (unless force_recreate is specified) |
|
if not force_recreate: |
|
success = update_rule_patch(auth_headers, zone_id, ruleset_id, rule_id, expression, action, description, dry_run, debug) |
|
if success: |
|
return True |
|
log("PATCH update failed, falling back to delete-and-create approach", level="warning") |
|
|
|
# If PATCH fails or force_recreate is specified, delete and recreate |
|
if not delete_rule(auth_headers, zone_id, ruleset_id, rule_id, dry_run, debug): |
|
return False |
|
|
|
return create_rule(auth_headers, zone_id, ruleset_id, description, expression, action, dry_run, debug) |
|
|
|
def update_rule_patch(auth_headers, zone_id, ruleset_id, rule_id, expression, action="block", description=None, dry_run=False, debug=False): |
|
"""Update a rule using PATCH method with provided authentication headers""" |
|
if dry_run: |
|
if RICH_AVAILABLE: |
|
panel_content = f"[cyan]Zone ID:[/cyan] {zone_id}\n" \ |
|
f"[cyan]Ruleset ID:[/cyan] {ruleset_id}\n" \ |
|
f"[cyan]Rule ID:[/cyan] {rule_id}\n" \ |
|
f"[cyan]New Expression:[/cyan] {expression}\n" \ |
|
f"[cyan]Action:[/cyan] {action}" |
|
|
|
if description: |
|
panel_content += f"\n[cyan]Description:[/cyan] {description}" |
|
|
|
console.print(Panel( |
|
panel_content, |
|
title="[yellow bold]DRY RUN: Would PATCH rule[/yellow bold]", |
|
border_style="yellow" |
|
)) |
|
else: |
|
log(f"[DRY RUN] Would patch rule {rule_id} in ruleset {ruleset_id} for zone {zone_id}") |
|
log(f"New expression: {expression}") |
|
log(f"Action: {action}") |
|
if description: |
|
log(f"Description: {description}") |
|
return True |
|
|
|
# Cloudflare requires the action parameter when patching rules |
|
data = { |
|
"expression": expression, |
|
"action": action |
|
} |
|
|
|
# IMPORTANT: Include the description in the PATCH request to maintain it |
|
if description: |
|
data["description"] = description |
|
|
|
if debug: |
|
log(f"PATCH request data: {json.dumps(data)}", level="debug") |
|
|
|
log("Updating rule via PATCH...") |
|
response = requests.patch( |
|
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{ruleset_id}/rules/{rule_id}", |
|
headers=auth_headers, |
|
json=data |
|
) |
|
|
|
if debug: |
|
log(f"PATCH response status: {response.status_code}", level="debug") |
|
log(f"PATCH response body: {response.text}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error patching WAF rule for zone {zone_id}: {response.status_code}", level="error") |
|
log(response.text, level="error") |
|
return False |
|
|
|
data = response.json() |
|
if not data.get('success', False): |
|
log(f"API returned an error when patching rule: {data.get('errors')}", level="error") |
|
return False |
|
|
|
return True |
|
|
|
def delete_rule(auth_headers, zone_id, ruleset_id, rule_id, dry_run=False, debug=False): |
|
"""Delete a rule with provided authentication headers""" |
|
if dry_run: |
|
if RICH_AVAILABLE: |
|
console.print(Panel( |
|
f"[cyan]Zone ID:[/cyan] {zone_id}\n" |
|
f"[cyan]Ruleset ID:[/cyan] {ruleset_id}\n" |
|
f"[cyan]Rule ID:[/cyan] {rule_id}", |
|
title="[yellow bold]DRY RUN: Would DELETE rule[/yellow bold]", |
|
border_style="yellow" |
|
)) |
|
else: |
|
log(f"[DRY RUN] Would delete rule {rule_id} from ruleset {ruleset_id} for zone {zone_id}") |
|
return True |
|
|
|
log("Deleting rule...") |
|
response = requests.delete( |
|
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{ruleset_id}/rules/{rule_id}", |
|
headers=auth_headers |
|
) |
|
|
|
if debug: |
|
log(f"DELETE response status: {response.status_code}", level="debug") |
|
log(f"DELETE response body: {response.text}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error deleting WAF rule for zone {zone_id}: {response.status_code}", level="error") |
|
log(response.text, level="error") |
|
return False |
|
|
|
data = response.json() |
|
if not data.get('success', False): |
|
log(f"API returned an error when deleting rule: {data.get('errors')}", level="error") |
|
return False |
|
|
|
return True |
|
|
|
def create_rule(auth_headers, zone_id, ruleset_id, description, expression, action="block", dry_run=False, debug=False): |
|
"""Create a rule with provided authentication headers""" |
|
if dry_run: |
|
if RICH_AVAILABLE: |
|
console.print(Panel( |
|
f"[cyan]Zone ID:[/cyan] {zone_id}\n" |
|
f"[cyan]Ruleset ID:[/cyan] {ruleset_id}\n" |
|
f"[cyan]Description:[/cyan] {description}\n" |
|
f"[cyan]Expression:[/cyan] {expression}\n" |
|
f"[cyan]Action:[/cyan] {action}", |
|
title="[yellow bold]DRY RUN: Would CREATE rule[/yellow bold]", |
|
border_style="yellow" |
|
)) |
|
else: |
|
log(f"[DRY RUN] Would create rule in ruleset {ruleset_id} for zone {zone_id}") |
|
log(f"Description: {description}") |
|
log(f"Expression: {expression}") |
|
log(f"Action: {action}") |
|
return True |
|
|
|
data = { |
|
"description": description, |
|
"expression": expression, |
|
"action": action |
|
} |
|
|
|
if debug: |
|
log(f"Creating rule with description: '{description}'", level="debug") |
|
log(f"Create request data: {json.dumps(data)}", level="debug") |
|
|
|
log("Creating new rule...") |
|
response = requests.post( |
|
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{ruleset_id}/rules", |
|
headers=auth_headers, |
|
json=data |
|
) |
|
|
|
if debug: |
|
log(f"CREATE response status: {response.status_code}", level="debug") |
|
log(f"CREATE response body: {response.text}", level="debug") |
|
|
|
if response.status_code != 200: |
|
log(f"Error creating WAF rule for zone {zone_id}: {response.status_code}", level="error") |
|
log(response.text, level="error") |
|
return False |
|
|
|
data = response.json() |
|
if not data.get('success', False): |
|
log(f"API returned an error when creating rule: {data.get('errors')}", level="error") |
|
return False |
|
|
|
# Verify the created rule has the correct description |
|
if debug and data.get('result'): |
|
created_rule = data.get('result') |
|
log(f"Created rule with description: '{created_rule.get('description')}'", level="debug") |
|
if created_rule.get('description') != description: |
|
log(f"WARNING: Created rule has description '{created_rule.get('description')}' instead of '{description}'", level="warning") |
|
|
|
return True |
|
|
|
def make_fixed_width_text(text, width=45): |
|
"""Format text to a fixed width by padding or truncating""" |
|
if len(text) > width: |
|
return text[:width-3] + "..." |
|
else: |
|
return text.ljust(width) |
|
|
|
def main(): |
|
args = parse_arguments() |
|
|
|
if RICH_AVAILABLE: |
|
console.print(Panel.fit( |
|
"[bold blue]Cloudflare WAF Custom Rules Updater[/bold blue]", |
|
border_style="cyan", |
|
title="v1.0" |
|
)) |
|
else: |
|
print("\n=== Cloudflare WAF Custom Rules Updater ===\n") |
|
|
|
# Determine authentication method and headers |
|
if args.api_key and args.email: |
|
log(f"Using Global API Key authentication with email: {args.email}") |
|
headers = { |
|
"X-Auth-Email": args.email, |
|
"X-Auth-Key": args.api_key, |
|
"Content-Type": "application/json" |
|
} |
|
auth_method = "Global API Key" |
|
auth_token = None |
|
elif args.api_key: |
|
log("When using --api-key, you must also provide --email", level="error") |
|
sys.exit(1) |
|
else: |
|
log(f"Using API Token authentication") |
|
headers = { |
|
"Authorization": f"Bearer {args.token}", |
|
"Content-Type": "application/json" |
|
} |
|
auth_method = "API Token" |
|
auth_token = args.token |
|
|
|
if args.debug: |
|
log(f"Authentication method: {auth_method}", level="debug") |
|
# Don't log actual headers with credentials |
|
log(f"Headers include appropriate authentication", level="debug") |
|
log(f"Target rule description: '{args.description}'", level="debug") |
|
|
|
# Fetch zones |
|
log("Fetching zones from Cloudflare...", level="info") |
|
|
|
if args.api_key and args.email: |
|
zones = get_zones_with_api_key(args.api_key, args.email, args.zone, args.debug) |
|
else: |
|
zones = get_zones(args.token, args.zone, args.debug) |
|
|
|
if not zones: |
|
log("No zones found. Please check your permissions.", level="error") |
|
return |
|
|
|
# Show success message with zone count |
|
log(f"Found {len(zones)} zones", level="success") |
|
|
|
# If we're using Rich, display a pretty header |
|
if RICH_AVAILABLE: |
|
console.print(Rule(style="cyan")) |
|
console.print("[bold cyan]Starting WAF Rule Updates[/bold cyan]") |
|
|
|
# Initialize counters |
|
total_success = 0 |
|
total_error = 0 |
|
total_skipped = 0 |
|
|
|
# Create a progress bar |
|
if RICH_AVAILABLE: |
|
with Progress( |
|
SpinnerColumn(), |
|
TextColumn("[bold cyan]{task.description}"), |
|
BarColumn(complete_style="green", finished_style="green", bar_width=40), |
|
TaskProgressColumn(), |
|
TextColumn("({task.completed}/{task.total})"), |
|
TimeElapsedColumn(), |
|
console=console, |
|
expand=False, |
|
transient=True |
|
) as progress: |
|
|
|
# Add task for overall processing |
|
task = progress.add_task(make_fixed_width_text("Initializing..."), total=len(zones)) |
|
|
|
# Process each zone |
|
for zone_index, zone in enumerate(zones): |
|
zone_id = zone['id'] |
|
zone_name = zone['name'] |
|
|
|
# Use the helper function to set the text to a fixed width |
|
progress.update(task, description=make_fixed_width_text(f"Processing: {zone_name}")) |
|
|
|
# Get WAF rulesets |
|
if args.api_key and args.email: |
|
waf_ruleset = get_waf_rulesets_with_api_key(args.api_key, args.email, zone_id, args.debug) |
|
else: |
|
waf_ruleset = get_waf_rulesets(args.token, zone_id, args.debug) |
|
|
|
if not waf_ruleset: |
|
log(f"Zone {zone_name}: No WAF rulesets found, skipping...", level="warning") |
|
total_skipped += 1 |
|
progress.advance(task) |
|
continue |
|
|
|
rule = find_rule_by_description(waf_ruleset, args.description) |
|
|
|
if rule: |
|
# Debug info if needed |
|
if args.debug: |
|
log(f"Found '{args.description}' rule in zone {zone_name}", level="debug") |
|
|
|
# Prepare auth headers |
|
if args.api_key and args.email: |
|
auth_headers = { |
|
"X-Auth-Email": args.email, |
|
"X-Auth-Key": args.api_key, |
|
"Content-Type": "application/json" |
|
} |
|
else: |
|
auth_headers = { |
|
"Authorization": f"Bearer {args.token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
# Update progress description |
|
progress.update(task, description=make_fixed_width_text(f"Updating rule for: {zone_name}")) |
|
|
|
success = update_rule( |
|
auth_headers, |
|
zone_id, |
|
waf_ruleset['id'], |
|
rule, |
|
args.description, |
|
args.expression, |
|
args.force_recreate, |
|
args.dry_run, |
|
args.debug |
|
) |
|
|
|
# Log the result WITHOUT stopping the progress bar |
|
if success: |
|
update_type = 'DRY RUN' if args.dry_run else 'Updated' |
|
log(f"Zone {zone_name}: [bold green]✓[/bold green] {update_type} '{args.description}' rule", level="success") |
|
total_success += 1 |
|
else: |
|
log(f"Zone {zone_name}: [bold red]✗[/bold red] Failed to update '{args.description}' rule", level="error") |
|
total_error += 1 |
|
else: |
|
# Rule not found |
|
if args.create_if_missing: |
|
log(f"Zone {zone_name}: Rule '{args.description}' not found, creating new rule...", level="info") |
|
|
|
# Update progress description |
|
progress.update(task, description=make_fixed_width_text(f"Creating rule for: {zone_name}")) |
|
|
|
# Prepare auth headers |
|
if args.api_key and args.email: |
|
auth_headers = { |
|
"X-Auth-Email": args.email, |
|
"X-Auth-Key": args.api_key, |
|
"Content-Type": "application/json" |
|
} |
|
else: |
|
auth_headers = { |
|
"Authorization": f"Bearer {args.token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
success = create_rule( |
|
auth_headers, |
|
zone_id, |
|
waf_ruleset['id'], |
|
args.description, |
|
args.expression, |
|
"block", |
|
args.dry_run, |
|
args.debug |
|
) |
|
|
|
# Log the result without stopping progress |
|
if success: |
|
create_type = 'DRY RUN' if args.dry_run else 'Created' |
|
log(f"Zone {zone_name}: [bold green]✓[/bold green] {create_type} new '{args.description}' rule", level="success") |
|
total_success += 1 |
|
else: |
|
log(f"Zone {zone_name}: [bold red]✗[/bold red] Failed to create '{args.description}' rule", level="error") |
|
else: |
|
log(f"Zone {zone_name}: [bold yellow]⚠[/bold yellow] Rule '{args.description}' not found (use --create-if-missing to create)", level="warning") |
|
total_skipped += 1 |
|
|
|
# Advance progress |
|
progress.advance(task) |
|
|
|
# After the last zone, update the progress bar with "Completed" |
|
if zone_index == len(zones) - 1: |
|
progress.update(task, description=make_fixed_width_text("[bold green]Processing completed[/bold green]")) |
|
|
|
else: |
|
# Simple, non-rich version |
|
print(f"Found {len(zones)} zones") |
|
print("\n=== Starting WAF Rule Updates ===\n") |
|
|
|
# Process each zone |
|
for zone_index, zone in enumerate(zones): |
|
zone_id = zone['id'] |
|
zone_name = zone['name'] |
|
|
|
print(f"Processing zone {zone_index+1}/{len(zones)}: {zone_name}") |
|
|
|
# Get WAF rulesets |
|
if args.api_key and args.email: |
|
waf_ruleset = get_waf_rulesets_with_api_key(args.api_key, args.email, zone_id, args.debug) |
|
else: |
|
waf_ruleset = get_waf_rulesets(args.token, zone_id, args.debug) |
|
|
|
if not waf_ruleset: |
|
print(f" No WAF rulesets found or error occurred, skipping...") |
|
total_skipped += 1 |
|
continue |
|
|
|
rule = find_rule_by_description(waf_ruleset, args.description) |
|
|
|
if rule: |
|
print(f" Found '{args.description}' rule") |
|
|
|
# Prepare auth headers |
|
if args.api_key and args.email: |
|
auth_headers = { |
|
"X-Auth-Email": args.email, |
|
"X-Auth-Key": args.api_key, |
|
"Content-Type": "application/json" |
|
} |
|
else: |
|
auth_headers = { |
|
"Authorization": f"Bearer {args.token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
success = update_rule( |
|
auth_headers, |
|
zone_id, |
|
waf_ruleset['id'], |
|
rule, |
|
args.description, |
|
args.expression, |
|
args.force_recreate, |
|
args.dry_run, |
|
args.debug |
|
) |
|
|
|
if success: |
|
update_type = 'simulated update of' if args.dry_run else 'updated' |
|
print(f" ✓ Successfully {update_type} rule") |
|
total_success += 1 |
|
else: |
|
print(f" ✗ Failed to update rule") |
|
total_error += 1 |
|
else: |
|
print(f" '{args.description}' rule not found") |
|
|
|
if args.create_if_missing: |
|
print(f" Creating rule...") |
|
|
|
# Prepare auth headers |
|
if args.api_key and args.email: |
|
auth_headers = { |
|
"X-Auth-Email": args.email, |
|
"X-Auth-Key": args.api_key, |
|
"Content-Type": "application/json" |
|
} |
|
else: |
|
auth_headers = { |
|
"Authorization": f"Bearer {args.token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
success = create_rule( |
|
auth_headers, |
|
zone_id, |
|
waf_ruleset['id'], |
|
args.description, |
|
args.expression, |
|
"block", |
|
args.dry_run, |
|
args.debug |
|
) |
|
|
|
if success: |
|
create_type = 'simulated creation of' if args.dry_run else 'created' |
|
print(f" ✓ Successfully {create_type} rule") |
|
total_success += 1 |
|
else: |
|
print(f" ✗ Failed to create rule") |
|
total_error += 1 |
|
else: |
|
print(f" ⚠ Skipping creation (use --create-if-missing to create)") |
|
total_skipped += 1 |
|
|
|
# Final summary |
|
if RICH_AVAILABLE: |
|
console.print() |
|
console.print(Rule(style="cyan")) |
|
console.print(Panel( |
|
f"[green bold]{total_success}[/green bold] zone(s) successfully updated\n" |
|
f"[red bold]{total_error}[/red bold] zone(s) failed\n" |
|
f"[yellow bold]{total_skipped}[/yellow bold] zone(s) skipped", |
|
title="[bold cyan]Operation Summary[/bold cyan]", |
|
border_style="cyan", |
|
expand=False |
|
)) |
|
else: |
|
print("\n=== Operation Summary ===") |
|
print(f"✓ Success: {total_success} zones") |
|
print(f"✗ Error: {total_error} zones") |
|
print(f"⚠ Skipped: {total_skipped} zones") |
|
|
|
# Exit with code based on success/failure |
|
if total_error > 0: |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |