Last active
March 14, 2025 15:59
-
-
Save leodido/39b117af36945c0a1595d151fb79decf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -eu | |
# Define variables | |
INSTALL_DIR="/usr/local/bin" | |
NAME="update_hetzner_cloudflare_ips" | |
SCRIPT_NAME="$NAME.py" | |
SCRIPT_PATH="$INSTALL_DIR/$SCRIPT_NAME" | |
SERVICE_FILE="/etc/systemd/system/$NAME.service" | |
TIMER_FILE="/etc/systemd/system/$NAME.timer" | |
DATA_DIR="/var/lib/$NAME" | |
LOG_FILE="/var/log/$NAME.log" | |
CONFIG_FILE="$DATA_DIR/config.json" | |
# Create the data directory | |
echo "Creating data directory at $DATA_DIR..." | |
sudo mkdir -p "$DATA_DIR" | |
sudo chown root:root "$DATA_DIR" | |
sudo chmod 700 "$DATA_DIR" | |
# Create the log directory if it doesn't exist | |
echo "Creating log directory for $LOG_FILE..." | |
sudo mkdir -p "$(dirname "$LOG_FILE")" | |
sudo touch "$LOG_FILE" | |
sudo chown root:root "$LOG_FILE" | |
sudo chmod 600 "$LOG_FILE" | |
# Create config file template if it doesn't exist | |
if [[ ! -f "$CONFIG_FILE" ]]; then | |
echo "Creating configuration file template at $CONFIG_FILE..." | |
cat << EOF | sudo tee "$CONFIG_FILE" > /dev/null | |
{ | |
"HETZNER_API_TOKEN": "YOUR_HETZNER_API_TOKEN", | |
"HETZNER_FIREWALL_ID": "YOUR_HETZNER_FIREWALL_ID", | |
"PORTS": ["YOUR_PORT"] | |
} | |
EOF | |
sudo chmod 600 "$CONFIG_FILE" | |
echo "Please edit $CONFIG_FILE and add your Hetzner API token and firewall ID." | |
fi | |
# Check if Python 3 is installed | |
if ! command -v python3 &> /dev/null; then | |
echo "Python 3 is not installed. Installing Python 3..." | |
if command -v apt-get &> /dev/null; then | |
# Debian/Ubuntu | |
echo "Detected Debian/Ubuntu system. Installing required packages..." | |
sudo apt-get update | |
sudo apt-get install -y python3 python3-pip | |
elif command -v dnf &> /dev/null; then | |
# Fedora/RHEL/CentOS 8+ | |
sudo dnf install -y python3 python3-venv | |
elif command -v yum &> /dev/null; then | |
# CentOS/RHEL 7 | |
sudo yum install -y python3 python3-venv | |
elif command -v zypper &> /dev/null; then | |
# openSUSE | |
sudo zypper install -y python3 python3-venv | |
elif command -v pacman &> /dev/null; then | |
# Arch Linux | |
sudo pacman -S --noconfirm python python-virtualenv | |
else | |
echo "Unable to install Python 3 automatically. Please install Python 3 and venv manually." | |
exit 1 | |
fi | |
fi | |
# For Debian/Ubuntu systems, ensure we have the right venv package | |
if command -v apt-get &> /dev/null; then | |
echo "Installing Python virtual environment package..." | |
# First try the generic package | |
sudo apt-get install -y python3-venv | |
# If that doesn't work, try version-specific package | |
if ! python3 -m venv --help &>/dev/null; then | |
# Get Python version | |
PY_VER=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")') | |
echo "Installing Python $PY_VER virtual environment package..." | |
sudo apt-get install -y python$PY_VER-venv | |
# Still not working? Try some common versions | |
if ! python3 -m venv --help &>/dev/null; then | |
echo "Trying common Python venv packages..." | |
for ver in 3.8 3.9 3.10 3.11 3.12; do | |
echo "Trying python$ver-venv..." | |
sudo apt-get install -y python$ver-venv || true | |
done | |
fi | |
fi | |
fi | |
# Create a virtual environment for our application | |
VENV_DIR="$DATA_DIR/venv" | |
echo "Creating Python virtual environment at $VENV_DIR..." | |
sudo python3 -m venv "$VENV_DIR" | |
# Install required Python packages in the virtual environment | |
echo "Installing required Python packages in virtual environment..." | |
sudo "$VENV_DIR/bin/pip" install requests | |
# Create update_hetzner_cloudflare_ips.py script | |
cat << EOF | sudo tee "$SCRIPT_PATH" > /dev/null | |
#!/usr/bin/env python3 | |
import requests | |
import json | |
import logging | |
import sys | |
import os | |
from datetime import datetime | |
# Set up logging | |
LOG_FILE = "${LOG_FILE}" | |
DATA_DIR = "${DATA_DIR}" | |
CONFIG_FILE = f"{DATA_DIR}/config.json" | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(LOG_FILE), | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger('hetzner_cloudflare_updater') | |
def load_config(): | |
"""Load configuration from the config file.""" | |
try: | |
with open(CONFIG_FILE, 'r') as file: | |
config = json.load(file) | |
# Validate required fields | |
if not config.get('HETZNER_API_TOKEN') or config.get('HETZNER_API_TOKEN') == "YOUR_HETZNER_API_TOKEN": | |
logger.error(f"Missing or default API token in {CONFIG_FILE}. Please set your Hetzner API token.") | |
sys.exit(1) | |
if not config.get('HETZNER_FIREWALL_ID') or config.get('HETZNER_FIREWALL_ID') == "YOUR_HETZNER_FIREWALL_ID": | |
logger.error(f"Missing or default firewall ID in {CONFIG_FILE}. Please set your Hetzner Firewall ID.") | |
sys.exit(1) | |
# Check if PORTS is missing or contains placeholder values | |
if not config.get('PORTS'): | |
logger.info("No PORTS configuration found, defaulting to port 443") | |
config['PORTS'] = ["443"] | |
# Convert ports to list if it's not already | |
if not isinstance(config['PORTS'], list): | |
logger.info("Converting single port to list format") | |
config['PORTS'] = [config['PORTS']] | |
# Check for placeholder port values | |
placeholder_ports = ["YOUR_PORT", ""] | |
config['PORTS'] = [p for p in config['PORTS'] if str(p) not in placeholder_ports] | |
if not config['PORTS']: | |
logger.info("No valid ports specified, defaulting to port 443") | |
config['PORTS'] = ["443"] | |
return config | |
except FileNotFoundError: | |
logger.error(f"Configuration file not found at {CONFIG_FILE}") | |
sys.exit(1) | |
except json.JSONDecodeError: | |
logger.error(f"Invalid JSON in configuration file {CONFIG_FILE}") | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Error loading configuration: {str(e)}") | |
sys.exit(1) | |
def get_cloudflare_ips(version): | |
"""Retrieve Cloudflare IP ranges for the specified version (4 or 6).""" | |
url = "https://www.cloudflare.com/ips-v" + version | |
logger.info(f"Getting IPs from {url}") | |
try: | |
response = requests.get(url, timeout=30) | |
response.raise_for_status() | |
ip_list = response.text.strip().split('\n') | |
logger.info(f"Retrieved {len(ip_list)} IPv{version} ranges from Cloudflare") | |
return ip_list | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to retrieve Cloudflare IP ranges: {str(e)}") | |
sys.exit(1) | |
def whitelist_ips_in_hetzner(ip_ranges, config): | |
"""Update Hetzner firewall rules with the provided IP ranges.""" | |
headers = { | |
'Authorization': f'Bearer {config["HETZNER_API_TOKEN"]}', | |
'Content-Type': 'application/json', | |
} | |
# Get ports from config, defaulting to ["443"] if not specified | |
ports = config.get("PORTS", ["443"]) | |
# Validate ports configuration | |
if not isinstance(ports, list): | |
logger.warning("PORTS configuration is not a list. Converting single port to list.") | |
ports = [ports] | |
if not ports: | |
logger.warning("No ports specified in configuration, defaulting to port 443") | |
ports = ["443"] | |
# Create rules for each port | |
rules = [] | |
for port in ports: | |
# Convert port to string if it's a number | |
port_str = str(port) | |
rules.append({ | |
"direction": "in", | |
"source_ips": ip_ranges, | |
"port": port_str, | |
"protocol": "tcp", | |
"description": f"Allow Cloudflare IPs to port {port_str}" | |
}) | |
payload = { | |
"rules": rules | |
} | |
logger.info(f"Updating Hetzner Firewall {config['HETZNER_FIREWALL_ID']} with {len(ip_ranges)} IP ranges on ports {config['PORTS']}") | |
try: | |
url = f"https://api.hetzner.cloud/v1/firewalls/{config['HETZNER_FIREWALL_ID']}/actions/set_rules" | |
logger.debug(f"Sending request to: {url}") | |
response = requests.post( | |
url, | |
headers=headers, | |
data=json.dumps(payload), | |
timeout=60 | |
) | |
response.raise_for_status() | |
ports_str = ', '.join(str(p) for p in config['PORTS']) | |
logger.info(f"Cloudflare IPs whitelisted successfully in Hetzner Firewall for ports {ports_str}") | |
# Save the current IPs for reference | |
save_current_ips(ip_ranges) | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to whitelist IPs in Hetzner Firewall: {str(e)}") | |
if hasattr(e, 'response') and e.response: | |
logger.error(f"Response: {e.response.text}") | |
sys.exit(1) | |
def save_current_ips(ip_ranges): | |
"""Save the current IP ranges to a file for reference.""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{DATA_DIR}/cloudflare_ips_{timestamp}.txt" | |
try: | |
with open(filename, 'w') as file: | |
for ip in ip_ranges: | |
file.write(f"{ip}\n") | |
logger.info(f"Saved current IP ranges to {filename}") | |
# Create a symlink to the latest file | |
latest_link = f"{DATA_DIR}/cloudflare_ips_latest.txt" | |
if os.path.exists(latest_link): | |
os.remove(latest_link) | |
os.symlink(filename, latest_link) | |
except Exception as e: | |
logger.error(f"Failed to save current IP ranges: {str(e)}") | |
def cleanup_old_files(): | |
"""Clean up old IP files, keeping only the 5 most recent.""" | |
try: | |
files = [f for f in os.listdir(DATA_DIR) if f.startswith('cloudflare_ips_') and f.endswith('.txt')] | |
files.sort(reverse=True) | |
# Keep only the 5 most recent files | |
for old_file in files[5:]: | |
os.remove(os.path.join(DATA_DIR, old_file)) | |
logger.info(f"Cleaned up old file: {old_file}") | |
except Exception as e: | |
logger.error(f"Error during cleanup: {str(e)}") | |
def main(): | |
"""Main function to update Cloudflare IPs in Hetzner Firewall.""" | |
logger.info("Starting Hetzner Cloudflare IP updater") | |
try: | |
# Load configuration | |
config = load_config() | |
# Get Cloudflare IPs | |
cloudflare_ips_v4 = get_cloudflare_ips("4") | |
cloudflare_ips_v6 = get_cloudflare_ips("6") | |
combined_ips = cloudflare_ips_v4 + cloudflare_ips_v6 | |
# Log the IPs we're whitelisting | |
logger.info(f"Whitelisting {len(combined_ips)} Cloudflare IP ranges") | |
for ip in combined_ips: | |
logger.debug(f"IP range: {ip}") | |
# Update Hetzner Firewall | |
whitelist_ips_in_hetzner(combined_ips, config) | |
# Clean up old files | |
cleanup_old_files() | |
logger.info("Hetzner Cloudflare IP updater completed successfully") | |
except Exception as e: | |
logger.error(f"Unexpected error: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() | |
EOF | |
# Make the script executable | |
echo "Setting execute permissions for $SCRIPT_PATH..." | |
sudo chmod +x "$SCRIPT_PATH" | |
# Create systemd service file | |
echo "Writing systemd service file: $SERVICE_FILE..." | |
cat << EOF | sudo tee "$SERVICE_FILE" > /dev/null | |
[Unit] | |
Description=Update Cloudflare IPs in Hetzner Firewall | |
After=network-online.target | |
Wants=network-online.target | |
[Service] | |
Type=oneshot | |
ExecStart=$VENV_DIR/bin/python $SCRIPT_PATH | |
PrivateTmp=true | |
ProtectSystem=full | |
ProtectHome=true | |
NoNewPrivileges=true | |
[Install] | |
WantedBy=multi-user.target | |
EOF | |
# Create systemd timer file | |
echo "Writing systemd timer file: $TIMER_FILE..." | |
cat << EOF | sudo tee "$TIMER_FILE" > /dev/null | |
[Unit] | |
Description=Run $NAME.service daily | |
After=network-online.target | |
[Timer] | |
OnCalendar=daily | |
RandomizedDelaySec=1hour | |
Persistent=true | |
[Install] | |
WantedBy=timers.target | |
EOF | |
# Reload systemd daemon to recognize our new service and timer | |
echo "Reloading systemd daemon..." | |
sudo systemctl daemon-reload | |
# Enable the systemd timer | |
echo "Enabling systemd timer..." | |
sudo systemctl enable $NAME.timer | |
# Start the timer if it's not already running | |
if ! systemctl is-active --quiet $NAME.timer; then | |
echo "Starting the timer..." | |
sudo systemctl start $NAME.timer | |
fi | |
echo "Installation complete. Please edit $CONFIG_FILE to add your Hetzner API token, firewall ID, and desired ports." | |
echo "The default port is 443 if none is specified." | |
echo "You can check the timer status with: systemctl status $NAME.timer" | |
echo "You can start the service manually with: sudo systemctl start $NAME.service" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment