Skip to content

Instantly share code, notes, and snippets.

@knzcx
Last active April 13, 2025 20:29
Show Gist options
  • Save knzcx/73e991b8a04117126a1c3e92021c9be5 to your computer and use it in GitHub Desktop.
Save knzcx/73e991b8a04117126a1c3e92021c9be5 to your computer and use it in GitHub Desktop.
Cloudflare WAF Custom Rules Updater

Cloudflare WAF Custom Rules Updater

A Python utility script that allows you to update Web Application Firewall (WAF) custom rules across multiple Cloudflare zones simultaneously.

Overview

This tool provides an efficient way to manage WAF custom rules across your Cloudflare account. It can:

  • Search for WAF rules by description
  • Update existing rules with new expressions
  • Optionally create rules if they don't exist
  • Support both API Token and Global API Key authentication
  • Provide a dry-run mode to preview changes
  • Offer rich, colorful console output (when the rich package is installed)

Ideal for security teams that manage multiple domains and want to maintain consistent WAF rules across their infrastructure.

Requirements

  • Python 3.6+

  • Required packages:

    • requests
  • Optional (but recommended) packages:

    • rich (for improved visual display)

Installation

  1. Clone this repository or download the script file
  2. Install required dependencies:
pip install requests rich

Authentication

The script supports two authentication methods:

  1. API Token (Recommended):

    • More secure as it uses limited-scope permissions
    • Created in the Cloudflare dashboard under "My Profile > API Tokens"
    • Requires "Zone:Read" and "WAF:Edit" permissions
  2. Global API Key:

    • Legacy method, has full account access
    • Requires your account email
    • Less secure but sometimes necessary for certain operations

Usage

Basic Usage

Update a rule named "Block Attacks" with the default attack patterns:

python3 cf_waf_updater.py --token YOUR_API_TOKEN

Custom WAF Rule

Specify a custom rule description and expression:

python3 cf_waf_updater.py --token YOUR_API_TOKEN \
  --description "Block SQL Injection" \
  --expression "(http.request.uri.path contains \"/wp-admin\") or (http.request.uri.path contains \"union+select\")"

Create Missing Rules

Update existing rules and create them if they don't exist:

python3 cf_waf_updater.py --token YOUR_API_TOKEN \
  --description "Block SQL Injection" \
  --expression "(http.request.uri.path contains \"union+select\")" \
  --create-if-missing

Target a Specific Zone

Apply changes to only one domain:

python3 cf_waf_updater.py --token YOUR_API_TOKEN \
  --zone example.com

Dry Run Mode

Preview changes without making them:

python3 cf_waf_updater.py --token YOUR_API_TOKEN \
  --dry-run

Using Global API Key

For legacy authentication:

python3 cf_waf_updater.py --api-key YOUR_GLOBAL_API_KEY \
  --email [email protected]

Debug Mode

Show detailed information for troubleshooting:

python3 cf_waf_updater.py --token YOUR_API_TOKEN \
  --debug

Command Line Arguments

Argument Description
--token Cloudflare API Token (recommended)
--api-key Alternative: Use Global API Key instead of API Token
--email Cloudflare account email (required if using API Key)
--description Description of the rule to update (default: "Block Attacks")
--expression WAF expression to update (default: common attack patterns)
--zone Target a specific zone by name (default: update all zones)
--create-if-missing Create the rule if it doesn't exist
--force-recreate Force delete and recreate instead of updating
--dry-run Show what would be updated without making changes
--debug Enable detailed debug output

Default WAF Expression

If no expression is provided, the script uses a default expression that blocks common attack patterns:

(http.request.uri.path contains ".env") or 
(http.request.uri.path contains "wlwmanifest.xml") or 
(http.request.uri.path contains "xmlrpc.php") or 
(http.request.uri.path contains "setup-config.php") or 
(http.request.uri.path contains "wp-login.php")

Example Output (with rich package)

When the rich package is installed, the script provides colorful, informative output:

┌────────────────────────────────────────────┐
│   Cloudflare WAF Custom Rules Updater      │
└────────────────────────────────────────────┘

12:34:56 INFO: Using API Token authentication
12:34:57 INFO: Fetching zones from Cloudflare...
12:34:58 SUCCESS: Found 3 zones
────────────────────────────────────────────────
Starting WAF Rule Updates
12:34:59 SUCCESS: Zone example.com: ✓ Updated 'Block Attacks' rule
12:35:01 SUCCESS: Zone example.org: ✓ Updated 'Block Attacks' rule
12:35:02 SUCCESS: Zone example.net: ✓ Created new 'Block Attacks' rule
────────────────────────────────────────────────
┌─Operation Summary─────────────────────────────┐
│ 3 zone(s) successfully updated                │
│ 0 zone(s) failed                              │
│ 0 zone(s) skipped                             │
└────────────────────────────────────────────────┘

Error Handling

The script provides detailed error messages for common issues:

  • Authentication failures
  • Permission problems
  • Network connectivity issues
  • Invalid API responses

Exit codes:

  • 0: All operations successful
  • 1: One or more operations failed

Best Practices

  1. Always start with --dry-run to preview changes
  2. Use API Tokens with minimal required permissions
  3. Test on a single zone first with --zone example.com
  4. Use --debug for troubleshooting

License

MIT License

Acknowledgements

#!/usr/bin/env python3
import argparse
import requests
import json
import sys
import time
from datetime import datetime
# Import Rich components at module level
try:
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn, TaskProgressColumn
from rich.panel import Panel
from rich.text import Text
from rich import box
from rich.rule import Rule
RICH_AVAILABLE = True
except ImportError:
print("For a better visual experience, install rich: pip install rich")
RICH_AVAILABLE = False
# Initialize rich console if available
console = Console() if RICH_AVAILABLE else None
def log(message, level="info", use_rich=True):
"""Log messages with rich formatting if available"""
if not RICH_AVAILABLE or not use_rich:
print(message)
return
timestamp = datetime.now().strftime("%H:%M:%S")
if level == "info":
console.print(f"[grey]{timestamp}[/grey] [blue]INFO[/blue]: {message}")
elif level == "success":
console.print(f"[grey]{timestamp}[/grey] [green]SUCCESS[/green]: {message}")
elif level == "warning":
console.print(f"[grey]{timestamp}[/grey] [yellow]WARNING[/yellow]: {message}")
elif level == "error":
console.print(f"[grey]{timestamp}[/grey] [red]ERROR[/red]: {message}")
elif level == "debug":
console.print(f"[grey]{timestamp}[/grey] [magenta]DEBUG[/magenta]: {message}")
elif level == "header":
console.print()
console.print(Panel(Text(message, justify="center"), border_style="blue"))
elif level == "subheader":
console.print(Text(f"\n{message}", style="blue bold"))
def display_zones_table(zones):
"""Display zones in a nicely formatted table"""
if not RICH_AVAILABLE or not zones:
return
table = Table(title="Cloudflare Zones", box=box.ROUNDED)
table.add_column("Zone Name", style="cyan")
table.add_column("Zone ID", style="magenta")
table.add_column("Status", style="green")
for zone in zones:
table.add_row(
zone.get('name', 'N/A'),
zone.get('id', 'N/A'),
zone.get('status', 'N/A')
)
console.print(table)
def display_rule_comparison(zone_name, old_expression, new_expression):
"""Display a comparison between old and new expressions"""
if not RICH_AVAILABLE:
return
console.print(Panel(
f"[bold cyan]Zone:[/bold cyan] {zone_name}\n\n"
f"[bold yellow]Current Expression:[/bold yellow]\n{old_expression}\n\n"
f"[bold green]New Expression:[/bold green]\n{new_expression}",
title="WAF Rule Update Preview",
border_style="blue"
))
def parse_arguments():
parser = argparse.ArgumentParser(description='Update Cloudflare WAF Custom Rules across multiple zones')
parser.add_argument('--token', required=True, help='Cloudflare API Token with appropriate permissions')
parser.add_argument('--description', default='Block Attacks', help='Description of the rule to update')
parser.add_argument('--expression', default='(http.request.uri.path contains ".env") or (http.request.uri.path contains "wlwmanifest.xml") or (http.request.uri.path contains "xmlrpc.php") or (http.request.uri.path contains "setup-config.php") or (http.request.uri.path contains "wp-login.php")',
help='WAF expression to update (default is to block common attack patterns)')
parser.add_argument('--dry-run', action='store_true', help='Show what would be updated without making changes')
parser.add_argument('--force-recreate', action='store_true', help='Force delete and recreate instead of attempting to update')
parser.add_argument('--create-if-missing', action='store_true', help='Create the rule if it doesn\'t exist')
parser.add_argument('--zone', help='Specific zone name to update (updates all zones if not specified)')
parser.add_argument('--debug', action='store_true', help='Enable debug output for troubleshooting')
parser.add_argument('--api-key', help='Use Global API Key instead of API Token (less secure, but may be needed)')
parser.add_argument('--email', help='Cloudflare account email, required if using API Key')
return parser.parse_args()
def get_zones(token, target_zone=None, debug=False):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = "https://api.cloudflare.com/client/v4/zones"
if debug:
log(f"Making request to {url}", level="debug")
# We don't log the actual headers with token for security
log(f"Using Bearer token authentication", level="debug")
try:
response = requests.get(url, headers=headers)
if debug:
log(f"Response status code: {response.status_code}", level="debug")
log(f"Response content type: {response.headers.get('Content-Type', 'N/A')}", level="debug")
if response.status_code != 200:
log(f"Error fetching zones: {response.status_code}", level="error")
log(f"Response: {response.text}", level="error")
if response.status_code == 401:
log("Authentication failed. Please check your API token.", level="error")
log("Make sure the token has 'Zone:Read' permissions.", level="error")
elif response.status_code == 403:
log("Authorization failed. Your token does not have permission to list zones.", level="error")
sys.exit(1)
data = response.json()
if debug and RICH_AVAILABLE:
console.print("[magenta]DEBUG: API response summary:[/magenta]")
console.print(f"Success: {data.get('success')}")
console.print(f"Result count: {len(data.get('result', []))}")
console.print(f"Errors count: {len(data.get('errors', []))}")
if not data.get('success', False):
log(f"API returned an error: {data.get('errors', 'Unknown error')}", level="error")
log(f"Message: {data.get('messages', 'No message provided')}", level="error")
sys.exit(1)
zones = data.get('result', [])
if debug and not zones:
log("No zones were returned by the API.", level="debug")
log("This could mean your account has no zones or your token lacks permission.", level="debug")
# Filter for a specific zone if requested
if target_zone:
zones = [zone for zone in zones if zone['name'] == target_zone]
if not zones:
log(f"Zone '{target_zone}' not found", level="error")
sys.exit(1)
return zones
except requests.exceptions.RequestException as e:
log(f"Network error when connecting to Cloudflare API: {e}", level="error")
sys.exit(1)
except json.JSONDecodeError:
log("Invalid JSON response from Cloudflare API.", level="error")
if debug:
log(f"Raw response: {response.text}", level="debug")
sys.exit(1)
def get_zones_with_api_key(api_key, email, target_zone=None, debug=False):
headers = {
"X-Auth-Email": email,
"X-Auth-Key": api_key,
"Content-Type": "application/json"
}
url = "https://api.cloudflare.com/client/v4/zones"
if debug:
log(f"Making request to {url} using Global API Key", level="debug")
try:
response = requests.get(url, headers=headers)
if debug:
log(f"Response status code: {response.status_code}", level="debug")
if response.status_code != 200:
log(f"Error fetching zones: {response.status_code}", level="error")
log(f"Response: {response.text}", level="error")
if response.status_code == 401:
log("Authentication failed. Please check your API Key and Email.", level="error")
elif response.status_code == 403:
log("Authorization failed. Your API Key does not have permission to list zones.", level="error")
sys.exit(1)
data = response.json()
if debug and not data.get('result', []):
log("DEBUG: API returned a response but no zones were found.", level="debug")
log(f"DEBUG: Success status: {data.get('success')}", level="debug")
if not data.get('success', False):
log(f"API returned an error: {data.get('errors', 'Unknown error')}", level="error")
sys.exit(1)
zones = data.get('result', [])
# Filter for a specific zone if requested
if target_zone:
zones = [zone for zone in zones if zone['name'] == target_zone]
if not zones:
log(f"Zone '{target_zone}' not found", level="error")
sys.exit(1)
return zones
except requests.exceptions.RequestException as e:
log(f"Network error when connecting to Cloudflare API: {e}", level="error")
sys.exit(1)
except json.JSONDecodeError:
log("Invalid JSON response from Cloudflare API.", level="error")
if debug:
log(f"DEBUG: Raw response: {response.text}", level="debug")
sys.exit(1)
def get_waf_rulesets(token, zone_id, debug=False):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/phases/http_request_firewall_custom/entrypoint"
if debug:
log(f"Making request to {url}", level="debug")
try:
response = requests.get(url, headers=headers)
if debug:
log(f"Response status: {response.status_code}", level="debug")
if response.status_code != 200:
log(f"Error fetching WAF rulesets for zone {zone_id}: {response.status_code}", level="error")
log(response.text, level="error")
return None
data = response.json()
if not data.get('success'):
log(f"API returned an error for zone {zone_id}: {data.get('errors')}", level="error")
return None
if debug and 'result' in data:
if RICH_AVAILABLE:
ruleset = data['result']
if 'rules' in ruleset:
log(f"Found {len(ruleset['rules'])} rules in the ruleset", level="debug")
# DEBUG: Log the descriptions of all rules
for idx, rule in enumerate(ruleset['rules']):
log(f"Rule {idx+1} description: '{rule.get('description', 'NO DESCRIPTION')}'", level="debug")
else:
log("Ruleset contains no rules", level="debug")
return data.get('result')
except requests.exceptions.RequestException as e:
log(f"Network error when fetching WAF rulesets: {e}", level="error")
return None
except json.JSONDecodeError:
log("Invalid JSON response when fetching WAF rulesets", level="error")
return None
def get_waf_rulesets_with_api_key(api_key, email, zone_id, debug=False):
"""Get WAF rulesets using the Global API Key authentication"""
headers = {
"X-Auth-Email": email,
"X-Auth-Key": api_key,
"Content-Type": "application/json"
}
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/phases/http_request_firewall_custom/entrypoint"
if debug:
log(f"Making request to {url} using Global API Key", level="debug")
try:
response = requests.get(url, headers=headers)
if debug:
log(f"Response status: {response.status_code}", level="debug")
if response.status_code != 200:
log(f"Error fetching WAF rulesets for zone {zone_id}: {response.status_code}", level="error")
log(response.text, level="error")
return None
data = response.json()
if not data.get('success'):
log(f"API returned an error for zone {zone_id}: {data.get('errors')}", level="error")
return None
if debug and 'result' in data:
if RICH_AVAILABLE:
ruleset = data['result']
if 'rules' in ruleset:
log(f"Found {len(ruleset['rules'])} rules in the ruleset", level="debug")
# DEBUG: Log the descriptions of all rules
for idx, rule in enumerate(ruleset['rules']):
log(f"Rule {idx+1} description: '{rule.get('description', 'NO DESCRIPTION')}'", level="debug")
else:
log("Ruleset contains no rules", level="debug")
return data.get('result')
except requests.exceptions.RequestException as e:
log(f"Network error when fetching WAF rulesets: {e}", level="error")
return None
except json.JSONDecodeError:
log("Invalid JSON response when fetching WAF rulesets", level="error")
return None
def find_rule_by_description(ruleset_data, description):
if not ruleset_data or 'rules' not in ruleset_data:
return None
for rule in ruleset_data.get('rules', []):
if rule.get('description') == description:
return rule
return None
def update_rule(auth_headers, zone_id, ruleset_id, rule, description, expression, force_recreate=False, dry_run=False, debug=False):
"""Update a rule using provided authentication headers"""
rule_id = rule['id']
action = rule.get('action', 'block')
if debug:
log(f"Updating rule with description: '{description}'", level="debug")
log(f"Current rule data: {json.dumps(rule)}", level="debug")
# Try to update using PATCH first (unless force_recreate is specified)
if not force_recreate:
success = update_rule_patch(auth_headers, zone_id, ruleset_id, rule_id, expression, action, description, dry_run, debug)
if success:
return True
log("PATCH update failed, falling back to delete-and-create approach", level="warning")
# If PATCH fails or force_recreate is specified, delete and recreate
if not delete_rule(auth_headers, zone_id, ruleset_id, rule_id, dry_run, debug):
return False
return create_rule(auth_headers, zone_id, ruleset_id, description, expression, action, dry_run, debug)
def update_rule_patch(auth_headers, zone_id, ruleset_id, rule_id, expression, action="block", description=None, dry_run=False, debug=False):
"""Update a rule using PATCH method with provided authentication headers"""
if dry_run:
if RICH_AVAILABLE:
panel_content = f"[cyan]Zone ID:[/cyan] {zone_id}\n" \
f"[cyan]Ruleset ID:[/cyan] {ruleset_id}\n" \
f"[cyan]Rule ID:[/cyan] {rule_id}\n" \
f"[cyan]New Expression:[/cyan] {expression}\n" \
f"[cyan]Action:[/cyan] {action}"
if description:
panel_content += f"\n[cyan]Description:[/cyan] {description}"
console.print(Panel(
panel_content,
title="[yellow bold]DRY RUN: Would PATCH rule[/yellow bold]",
border_style="yellow"
))
else:
log(f"[DRY RUN] Would patch rule {rule_id} in ruleset {ruleset_id} for zone {zone_id}")
log(f"New expression: {expression}")
log(f"Action: {action}")
if description:
log(f"Description: {description}")
return True
# Cloudflare requires the action parameter when patching rules
data = {
"expression": expression,
"action": action
}
# IMPORTANT: Include the description in the PATCH request to maintain it
if description:
data["description"] = description
if debug:
log(f"PATCH request data: {json.dumps(data)}", level="debug")
log("Updating rule via PATCH...")
response = requests.patch(
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{ruleset_id}/rules/{rule_id}",
headers=auth_headers,
json=data
)
if debug:
log(f"PATCH response status: {response.status_code}", level="debug")
log(f"PATCH response body: {response.text}", level="debug")
if response.status_code != 200:
log(f"Error patching WAF rule for zone {zone_id}: {response.status_code}", level="error")
log(response.text, level="error")
return False
data = response.json()
if not data.get('success', False):
log(f"API returned an error when patching rule: {data.get('errors')}", level="error")
return False
return True
def delete_rule(auth_headers, zone_id, ruleset_id, rule_id, dry_run=False, debug=False):
"""Delete a rule with provided authentication headers"""
if dry_run:
if RICH_AVAILABLE:
console.print(Panel(
f"[cyan]Zone ID:[/cyan] {zone_id}\n"
f"[cyan]Ruleset ID:[/cyan] {ruleset_id}\n"
f"[cyan]Rule ID:[/cyan] {rule_id}",
title="[yellow bold]DRY RUN: Would DELETE rule[/yellow bold]",
border_style="yellow"
))
else:
log(f"[DRY RUN] Would delete rule {rule_id} from ruleset {ruleset_id} for zone {zone_id}")
return True
log("Deleting rule...")
response = requests.delete(
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{ruleset_id}/rules/{rule_id}",
headers=auth_headers
)
if debug:
log(f"DELETE response status: {response.status_code}", level="debug")
log(f"DELETE response body: {response.text}", level="debug")
if response.status_code != 200:
log(f"Error deleting WAF rule for zone {zone_id}: {response.status_code}", level="error")
log(response.text, level="error")
return False
data = response.json()
if not data.get('success', False):
log(f"API returned an error when deleting rule: {data.get('errors')}", level="error")
return False
return True
def create_rule(auth_headers, zone_id, ruleset_id, description, expression, action="block", dry_run=False, debug=False):
"""Create a rule with provided authentication headers"""
if dry_run:
if RICH_AVAILABLE:
console.print(Panel(
f"[cyan]Zone ID:[/cyan] {zone_id}\n"
f"[cyan]Ruleset ID:[/cyan] {ruleset_id}\n"
f"[cyan]Description:[/cyan] {description}\n"
f"[cyan]Expression:[/cyan] {expression}\n"
f"[cyan]Action:[/cyan] {action}",
title="[yellow bold]DRY RUN: Would CREATE rule[/yellow bold]",
border_style="yellow"
))
else:
log(f"[DRY RUN] Would create rule in ruleset {ruleset_id} for zone {zone_id}")
log(f"Description: {description}")
log(f"Expression: {expression}")
log(f"Action: {action}")
return True
data = {
"description": description,
"expression": expression,
"action": action
}
if debug:
log(f"Creating rule with description: '{description}'", level="debug")
log(f"Create request data: {json.dumps(data)}", level="debug")
log("Creating new rule...")
response = requests.post(
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{ruleset_id}/rules",
headers=auth_headers,
json=data
)
if debug:
log(f"CREATE response status: {response.status_code}", level="debug")
log(f"CREATE response body: {response.text}", level="debug")
if response.status_code != 200:
log(f"Error creating WAF rule for zone {zone_id}: {response.status_code}", level="error")
log(response.text, level="error")
return False
data = response.json()
if not data.get('success', False):
log(f"API returned an error when creating rule: {data.get('errors')}", level="error")
return False
# Verify the created rule has the correct description
if debug and data.get('result'):
created_rule = data.get('result')
log(f"Created rule with description: '{created_rule.get('description')}'", level="debug")
if created_rule.get('description') != description:
log(f"WARNING: Created rule has description '{created_rule.get('description')}' instead of '{description}'", level="warning")
return True
def make_fixed_width_text(text, width=45):
"""Format text to a fixed width by padding or truncating"""
if len(text) > width:
return text[:width-3] + "..."
else:
return text.ljust(width)
def main():
args = parse_arguments()
if RICH_AVAILABLE:
console.print(Panel.fit(
"[bold blue]Cloudflare WAF Custom Rules Updater[/bold blue]",
border_style="cyan",
title="v1.0"
))
else:
print("\n=== Cloudflare WAF Custom Rules Updater ===\n")
# Determine authentication method and headers
if args.api_key and args.email:
log(f"Using Global API Key authentication with email: {args.email}")
headers = {
"X-Auth-Email": args.email,
"X-Auth-Key": args.api_key,
"Content-Type": "application/json"
}
auth_method = "Global API Key"
auth_token = None
elif args.api_key:
log("When using --api-key, you must also provide --email", level="error")
sys.exit(1)
else:
log(f"Using API Token authentication")
headers = {
"Authorization": f"Bearer {args.token}",
"Content-Type": "application/json"
}
auth_method = "API Token"
auth_token = args.token
if args.debug:
log(f"Authentication method: {auth_method}", level="debug")
# Don't log actual headers with credentials
log(f"Headers include appropriate authentication", level="debug")
log(f"Target rule description: '{args.description}'", level="debug")
# Fetch zones
log("Fetching zones from Cloudflare...", level="info")
if args.api_key and args.email:
zones = get_zones_with_api_key(args.api_key, args.email, args.zone, args.debug)
else:
zones = get_zones(args.token, args.zone, args.debug)
if not zones:
log("No zones found. Please check your permissions.", level="error")
return
# Show success message with zone count
log(f"Found {len(zones)} zones", level="success")
# If we're using Rich, display a pretty header
if RICH_AVAILABLE:
console.print(Rule(style="cyan"))
console.print("[bold cyan]Starting WAF Rule Updates[/bold cyan]")
# Initialize counters
total_success = 0
total_error = 0
total_skipped = 0
# Create a progress bar
if RICH_AVAILABLE:
with Progress(
SpinnerColumn(),
TextColumn("[bold cyan]{task.description}"),
BarColumn(complete_style="green", finished_style="green", bar_width=40),
TaskProgressColumn(),
TextColumn("({task.completed}/{task.total})"),
TimeElapsedColumn(),
console=console,
expand=False,
transient=True
) as progress:
# Add task for overall processing
task = progress.add_task(make_fixed_width_text("Initializing..."), total=len(zones))
# Process each zone
for zone_index, zone in enumerate(zones):
zone_id = zone['id']
zone_name = zone['name']
# Use the helper function to set the text to a fixed width
progress.update(task, description=make_fixed_width_text(f"Processing: {zone_name}"))
# Get WAF rulesets
if args.api_key and args.email:
waf_ruleset = get_waf_rulesets_with_api_key(args.api_key, args.email, zone_id, args.debug)
else:
waf_ruleset = get_waf_rulesets(args.token, zone_id, args.debug)
if not waf_ruleset:
log(f"Zone {zone_name}: No WAF rulesets found, skipping...", level="warning")
total_skipped += 1
progress.advance(task)
continue
rule = find_rule_by_description(waf_ruleset, args.description)
if rule:
# Debug info if needed
if args.debug:
log(f"Found '{args.description}' rule in zone {zone_name}", level="debug")
# Prepare auth headers
if args.api_key and args.email:
auth_headers = {
"X-Auth-Email": args.email,
"X-Auth-Key": args.api_key,
"Content-Type": "application/json"
}
else:
auth_headers = {
"Authorization": f"Bearer {args.token}",
"Content-Type": "application/json"
}
# Update progress description
progress.update(task, description=make_fixed_width_text(f"Updating rule for: {zone_name}"))
success = update_rule(
auth_headers,
zone_id,
waf_ruleset['id'],
rule,
args.description,
args.expression,
args.force_recreate,
args.dry_run,
args.debug
)
# Log the result WITHOUT stopping the progress bar
if success:
update_type = 'DRY RUN' if args.dry_run else 'Updated'
log(f"Zone {zone_name}: [bold green]✓[/bold green] {update_type} '{args.description}' rule", level="success")
total_success += 1
else:
log(f"Zone {zone_name}: [bold red]✗[/bold red] Failed to update '{args.description}' rule", level="error")
total_error += 1
else:
# Rule not found
if args.create_if_missing:
log(f"Zone {zone_name}: Rule '{args.description}' not found, creating new rule...", level="info")
# Update progress description
progress.update(task, description=make_fixed_width_text(f"Creating rule for: {zone_name}"))
# Prepare auth headers
if args.api_key and args.email:
auth_headers = {
"X-Auth-Email": args.email,
"X-Auth-Key": args.api_key,
"Content-Type": "application/json"
}
else:
auth_headers = {
"Authorization": f"Bearer {args.token}",
"Content-Type": "application/json"
}
success = create_rule(
auth_headers,
zone_id,
waf_ruleset['id'],
args.description,
args.expression,
"block",
args.dry_run,
args.debug
)
# Log the result without stopping progress
if success:
create_type = 'DRY RUN' if args.dry_run else 'Created'
log(f"Zone {zone_name}: [bold green]✓[/bold green] {create_type} new '{args.description}' rule", level="success")
total_success += 1
else:
log(f"Zone {zone_name}: [bold red]✗[/bold red] Failed to create '{args.description}' rule", level="error")
else:
log(f"Zone {zone_name}: [bold yellow]⚠[/bold yellow] Rule '{args.description}' not found (use --create-if-missing to create)", level="warning")
total_skipped += 1
# Advance progress
progress.advance(task)
# After the last zone, update the progress bar with "Completed"
if zone_index == len(zones) - 1:
progress.update(task, description=make_fixed_width_text("[bold green]Processing completed[/bold green]"))
else:
# Simple, non-rich version
print(f"Found {len(zones)} zones")
print("\n=== Starting WAF Rule Updates ===\n")
# Process each zone
for zone_index, zone in enumerate(zones):
zone_id = zone['id']
zone_name = zone['name']
print(f"Processing zone {zone_index+1}/{len(zones)}: {zone_name}")
# Get WAF rulesets
if args.api_key and args.email:
waf_ruleset = get_waf_rulesets_with_api_key(args.api_key, args.email, zone_id, args.debug)
else:
waf_ruleset = get_waf_rulesets(args.token, zone_id, args.debug)
if not waf_ruleset:
print(f" No WAF rulesets found or error occurred, skipping...")
total_skipped += 1
continue
rule = find_rule_by_description(waf_ruleset, args.description)
if rule:
print(f" Found '{args.description}' rule")
# Prepare auth headers
if args.api_key and args.email:
auth_headers = {
"X-Auth-Email": args.email,
"X-Auth-Key": args.api_key,
"Content-Type": "application/json"
}
else:
auth_headers = {
"Authorization": f"Bearer {args.token}",
"Content-Type": "application/json"
}
success = update_rule(
auth_headers,
zone_id,
waf_ruleset['id'],
rule,
args.description,
args.expression,
args.force_recreate,
args.dry_run,
args.debug
)
if success:
update_type = 'simulated update of' if args.dry_run else 'updated'
print(f" ✓ Successfully {update_type} rule")
total_success += 1
else:
print(f" ✗ Failed to update rule")
total_error += 1
else:
print(f" '{args.description}' rule not found")
if args.create_if_missing:
print(f" Creating rule...")
# Prepare auth headers
if args.api_key and args.email:
auth_headers = {
"X-Auth-Email": args.email,
"X-Auth-Key": args.api_key,
"Content-Type": "application/json"
}
else:
auth_headers = {
"Authorization": f"Bearer {args.token}",
"Content-Type": "application/json"
}
success = create_rule(
auth_headers,
zone_id,
waf_ruleset['id'],
args.description,
args.expression,
"block",
args.dry_run,
args.debug
)
if success:
create_type = 'simulated creation of' if args.dry_run else 'created'
print(f" ✓ Successfully {create_type} rule")
total_success += 1
else:
print(f" ✗ Failed to create rule")
total_error += 1
else:
print(f" ⚠ Skipping creation (use --create-if-missing to create)")
total_skipped += 1
# Final summary
if RICH_AVAILABLE:
console.print()
console.print(Rule(style="cyan"))
console.print(Panel(
f"[green bold]{total_success}[/green bold] zone(s) successfully updated\n"
f"[red bold]{total_error}[/red bold] zone(s) failed\n"
f"[yellow bold]{total_skipped}[/yellow bold] zone(s) skipped",
title="[bold cyan]Operation Summary[/bold cyan]",
border_style="cyan",
expand=False
))
else:
print("\n=== Operation Summary ===")
print(f"✓ Success: {total_success} zones")
print(f"✗ Error: {total_error} zones")
print(f"⚠ Skipped: {total_skipped} zones")
# Exit with code based on success/failure
if total_error > 0:
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment