Skip to content

Instantly share code, notes, and snippets.

@leodido
Last active March 14, 2025 15:59
Show Gist options
  • Save leodido/39b117af36945c0a1595d151fb79decf to your computer and use it in GitHub Desktop.
Save leodido/39b117af36945c0a1595d151fb79decf to your computer and use it in GitHub Desktop.
#!/usr/bin/env bash
set -eu
# Define variables
INSTALL_DIR="/usr/local/bin"
NAME="update_hetzner_cloudflare_ips"
SCRIPT_NAME="$NAME.py"
SCRIPT_PATH="$INSTALL_DIR/$SCRIPT_NAME"
SERVICE_FILE="/etc/systemd/system/$NAME.service"
TIMER_FILE="/etc/systemd/system/$NAME.timer"
DATA_DIR="/var/lib/$NAME"
LOG_FILE="/var/log/$NAME.log"
CONFIG_FILE="$DATA_DIR/config.json"
# Create the data directory
echo "Creating data directory at $DATA_DIR..."
sudo mkdir -p "$DATA_DIR"
sudo chown root:root "$DATA_DIR"
sudo chmod 700 "$DATA_DIR"
# Create the log directory if it doesn't exist
echo "Creating log directory for $LOG_FILE..."
sudo mkdir -p "$(dirname "$LOG_FILE")"
sudo touch "$LOG_FILE"
sudo chown root:root "$LOG_FILE"
sudo chmod 600 "$LOG_FILE"
# Create config file template if it doesn't exist
if [[ ! -f "$CONFIG_FILE" ]]; then
echo "Creating configuration file template at $CONFIG_FILE..."
cat << EOF | sudo tee "$CONFIG_FILE" > /dev/null
{
"HETZNER_API_TOKEN": "YOUR_HETZNER_API_TOKEN",
"HETZNER_FIREWALL_ID": "YOUR_HETZNER_FIREWALL_ID",
"PORTS": ["YOUR_PORT"]
}
EOF
sudo chmod 600 "$CONFIG_FILE"
echo "Please edit $CONFIG_FILE and add your Hetzner API token and firewall ID."
fi
# Check if Python 3 is installed
if ! command -v python3 &> /dev/null; then
echo "Python 3 is not installed. Installing Python 3..."
if command -v apt-get &> /dev/null; then
# Debian/Ubuntu
echo "Detected Debian/Ubuntu system. Installing required packages..."
sudo apt-get update
sudo apt-get install -y python3 python3-pip
elif command -v dnf &> /dev/null; then
# Fedora/RHEL/CentOS 8+
sudo dnf install -y python3 python3-venv
elif command -v yum &> /dev/null; then
# CentOS/RHEL 7
sudo yum install -y python3 python3-venv
elif command -v zypper &> /dev/null; then
# openSUSE
sudo zypper install -y python3 python3-venv
elif command -v pacman &> /dev/null; then
# Arch Linux
sudo pacman -S --noconfirm python python-virtualenv
else
echo "Unable to install Python 3 automatically. Please install Python 3 and venv manually."
exit 1
fi
fi
# For Debian/Ubuntu systems, ensure we have the right venv package
if command -v apt-get &> /dev/null; then
echo "Installing Python virtual environment package..."
# First try the generic package
sudo apt-get install -y python3-venv
# If that doesn't work, try version-specific package
if ! python3 -m venv --help &>/dev/null; then
# Get Python version
PY_VER=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')
echo "Installing Python $PY_VER virtual environment package..."
sudo apt-get install -y python$PY_VER-venv
# Still not working? Try some common versions
if ! python3 -m venv --help &>/dev/null; then
echo "Trying common Python venv packages..."
for ver in 3.8 3.9 3.10 3.11 3.12; do
echo "Trying python$ver-venv..."
sudo apt-get install -y python$ver-venv || true
done
fi
fi
fi
# Create a virtual environment for our application
VENV_DIR="$DATA_DIR/venv"
echo "Creating Python virtual environment at $VENV_DIR..."
sudo python3 -m venv "$VENV_DIR"
# Install required Python packages in the virtual environment
echo "Installing required Python packages in virtual environment..."
sudo "$VENV_DIR/bin/pip" install requests
# Create update_hetzner_cloudflare_ips.py script
cat << EOF | sudo tee "$SCRIPT_PATH" > /dev/null
#!/usr/bin/env python3
import requests
import json
import logging
import sys
import os
from datetime import datetime
# Set up logging
LOG_FILE = "${LOG_FILE}"
DATA_DIR = "${DATA_DIR}"
CONFIG_FILE = f"{DATA_DIR}/config.json"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger('hetzner_cloudflare_updater')
def load_config():
"""Load configuration from the config file."""
try:
with open(CONFIG_FILE, 'r') as file:
config = json.load(file)
# Validate required fields
if not config.get('HETZNER_API_TOKEN') or config.get('HETZNER_API_TOKEN') == "YOUR_HETZNER_API_TOKEN":
logger.error(f"Missing or default API token in {CONFIG_FILE}. Please set your Hetzner API token.")
sys.exit(1)
if not config.get('HETZNER_FIREWALL_ID') or config.get('HETZNER_FIREWALL_ID') == "YOUR_HETZNER_FIREWALL_ID":
logger.error(f"Missing or default firewall ID in {CONFIG_FILE}. Please set your Hetzner Firewall ID.")
sys.exit(1)
# Check if PORTS is missing or contains placeholder values
if not config.get('PORTS'):
logger.info("No PORTS configuration found, defaulting to port 443")
config['PORTS'] = ["443"]
# Convert ports to list if it's not already
if not isinstance(config['PORTS'], list):
logger.info("Converting single port to list format")
config['PORTS'] = [config['PORTS']]
# Check for placeholder port values
placeholder_ports = ["YOUR_PORT", ""]
config['PORTS'] = [p for p in config['PORTS'] if str(p) not in placeholder_ports]
if not config['PORTS']:
logger.info("No valid ports specified, defaulting to port 443")
config['PORTS'] = ["443"]
return config
except FileNotFoundError:
logger.error(f"Configuration file not found at {CONFIG_FILE}")
sys.exit(1)
except json.JSONDecodeError:
logger.error(f"Invalid JSON in configuration file {CONFIG_FILE}")
sys.exit(1)
except Exception as e:
logger.error(f"Error loading configuration: {str(e)}")
sys.exit(1)
def get_cloudflare_ips(version):
"""Retrieve Cloudflare IP ranges for the specified version (4 or 6)."""
url = "https://www.cloudflare.com/ips-v" + version
logger.info(f"Getting IPs from {url}")
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
ip_list = response.text.strip().split('\n')
logger.info(f"Retrieved {len(ip_list)} IPv{version} ranges from Cloudflare")
return ip_list
except requests.exceptions.RequestException as e:
logger.error(f"Failed to retrieve Cloudflare IP ranges: {str(e)}")
sys.exit(1)
def whitelist_ips_in_hetzner(ip_ranges, config):
"""Update Hetzner firewall rules with the provided IP ranges."""
headers = {
'Authorization': f'Bearer {config["HETZNER_API_TOKEN"]}',
'Content-Type': 'application/json',
}
# Get ports from config, defaulting to ["443"] if not specified
ports = config.get("PORTS", ["443"])
# Validate ports configuration
if not isinstance(ports, list):
logger.warning("PORTS configuration is not a list. Converting single port to list.")
ports = [ports]
if not ports:
logger.warning("No ports specified in configuration, defaulting to port 443")
ports = ["443"]
# Create rules for each port
rules = []
for port in ports:
# Convert port to string if it's a number
port_str = str(port)
rules.append({
"direction": "in",
"source_ips": ip_ranges,
"port": port_str,
"protocol": "tcp",
"description": f"Allow Cloudflare IPs to port {port_str}"
})
payload = {
"rules": rules
}
logger.info(f"Updating Hetzner Firewall {config['HETZNER_FIREWALL_ID']} with {len(ip_ranges)} IP ranges on ports {config['PORTS']}")
try:
url = f"https://api.hetzner.cloud/v1/firewalls/{config['HETZNER_FIREWALL_ID']}/actions/set_rules"
logger.debug(f"Sending request to: {url}")
response = requests.post(
url,
headers=headers,
data=json.dumps(payload),
timeout=60
)
response.raise_for_status()
ports_str = ', '.join(str(p) for p in config['PORTS'])
logger.info(f"Cloudflare IPs whitelisted successfully in Hetzner Firewall for ports {ports_str}")
# Save the current IPs for reference
save_current_ips(ip_ranges)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to whitelist IPs in Hetzner Firewall: {str(e)}")
if hasattr(e, 'response') and e.response:
logger.error(f"Response: {e.response.text}")
sys.exit(1)
def save_current_ips(ip_ranges):
"""Save the current IP ranges to a file for reference."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{DATA_DIR}/cloudflare_ips_{timestamp}.txt"
try:
with open(filename, 'w') as file:
for ip in ip_ranges:
file.write(f"{ip}\n")
logger.info(f"Saved current IP ranges to {filename}")
# Create a symlink to the latest file
latest_link = f"{DATA_DIR}/cloudflare_ips_latest.txt"
if os.path.exists(latest_link):
os.remove(latest_link)
os.symlink(filename, latest_link)
except Exception as e:
logger.error(f"Failed to save current IP ranges: {str(e)}")
def cleanup_old_files():
"""Clean up old IP files, keeping only the 5 most recent."""
try:
files = [f for f in os.listdir(DATA_DIR) if f.startswith('cloudflare_ips_') and f.endswith('.txt')]
files.sort(reverse=True)
# Keep only the 5 most recent files
for old_file in files[5:]:
os.remove(os.path.join(DATA_DIR, old_file))
logger.info(f"Cleaned up old file: {old_file}")
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
def main():
"""Main function to update Cloudflare IPs in Hetzner Firewall."""
logger.info("Starting Hetzner Cloudflare IP updater")
try:
# Load configuration
config = load_config()
# Get Cloudflare IPs
cloudflare_ips_v4 = get_cloudflare_ips("4")
cloudflare_ips_v6 = get_cloudflare_ips("6")
combined_ips = cloudflare_ips_v4 + cloudflare_ips_v6
# Log the IPs we're whitelisting
logger.info(f"Whitelisting {len(combined_ips)} Cloudflare IP ranges")
for ip in combined_ips:
logger.debug(f"IP range: {ip}")
# Update Hetzner Firewall
whitelist_ips_in_hetzner(combined_ips, config)
# Clean up old files
cleanup_old_files()
logger.info("Hetzner Cloudflare IP updater completed successfully")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
EOF
# Make the script executable
echo "Setting execute permissions for $SCRIPT_PATH..."
sudo chmod +x "$SCRIPT_PATH"
# Create systemd service file
echo "Writing systemd service file: $SERVICE_FILE..."
cat << EOF | sudo tee "$SERVICE_FILE" > /dev/null
[Unit]
Description=Update Cloudflare IPs in Hetzner Firewall
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
ExecStart=$VENV_DIR/bin/python $SCRIPT_PATH
PrivateTmp=true
ProtectSystem=full
ProtectHome=true
NoNewPrivileges=true
[Install]
WantedBy=multi-user.target
EOF
# Create systemd timer file
echo "Writing systemd timer file: $TIMER_FILE..."
cat << EOF | sudo tee "$TIMER_FILE" > /dev/null
[Unit]
Description=Run $NAME.service daily
After=network-online.target
[Timer]
OnCalendar=daily
RandomizedDelaySec=1hour
Persistent=true
[Install]
WantedBy=timers.target
EOF
# Reload systemd daemon to recognize our new service and timer
echo "Reloading systemd daemon..."
sudo systemctl daemon-reload
# Enable the systemd timer
echo "Enabling systemd timer..."
sudo systemctl enable $NAME.timer
# Start the timer if it's not already running
if ! systemctl is-active --quiet $NAME.timer; then
echo "Starting the timer..."
sudo systemctl start $NAME.timer
fi
echo "Installation complete. Please edit $CONFIG_FILE to add your Hetzner API token, firewall ID, and desired ports."
echo "The default port is 443 if none is specified."
echo "You can check the timer status with: systemctl status $NAME.timer"
echo "You can start the service manually with: sudo systemctl start $NAME.service"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment