Skip to content

Instantly share code, notes, and snippets.

@domeniko-gentner
Last active April 25, 2025 19:03
Show Gist options
  • Save domeniko-gentner/2d0f6ed224ad088d8361a032524a6259 to your computer and use it in GitHub Desktop.
Save domeniko-gentner/2d0f6ed224ad088d8361a032524a6259 to your computer and use it in GitHub Desktop.
HCloud: Refresh Firewall with Home IP
#!/usr/bin/env python3
from hcloud import Client
from hcloud.firewalls.domain import FirewallRule
from requests import get as r_get, HTTPError, Timeout
from ipaddress import ip_network
from systemd.journal import send
from datetime import datetime
from hcloud.hcloud import APIException
from argparse import ArgumentParser
def logit(message: str, now: str) -> None:
print(f"refresh_firewall_cloud {now} {message}")
send(f"refresh_firewall_cloud {now} {message}")
def read_line_from_file(filename: str, now: str) -> str:
try:
with open(filename, mode='r') as f:
data = f.readline().strip()
return data
except IOError as error:
logit(f"IOError while reading {filename}: {error.message}", now)
exit(1)
def write_line_to_file(filename: str, data: str, now:str) -> None:
try:
with open(filename, mode="w") as f:
f.write(data)
except IOError as error:
logit(f"IOError while writing {filename}: {error.message}", now)
exit(1)
def get_own_ipv4(now: str) -> str:
try:
ipv4_result = r_get('https://api.ipify.org')
except HTTPError as error:
logit(f"Received a HTTP Error while getting your IP: {error}", now)
exit(1)
except ConnectionError as error:
logit(f"Received a Connection Error while getting your IP: {error}", now)
exit(1)
except Timeout as error:
logit(f"Received a Timeout Error while getting your IP: {error}", now)
exit(1)
if ipv4_result.status_code == 200:
return str(f"{ipv4_result.text.strip()}/32")
else:
logit(f"HTTP Code was not 200, it was {ipv4_result.status_code}", now)
exit(1)
def get_own_ipv6(now: str):
try:
ipv6_result = r_get('https://api6.ipify.org')
except HTTPError as error:
logit(f"Received a HTTP Error while getting your IP: {error}", now)
exit(1)
except ConnectionError as error:
logit(f"Received a Connection Error while getting your IP: {error}", now)
exit(1)
except Timeout as error:
logit(f"Received a Timeout Error while getting your IP: {error}", now)
exit(1)
if ipv6_result.status_code == 200:
network = ip_network(f"{ipv6_result.text.strip()}/64", strict=False)
return str(f"{network.network_address}/64")
else:
logit(f"HTTP Code was not 200, it was {ipv6_result.status_code}", now)
exit(1)
def main(now: str, force_update: bool):
# Get own IP from Internet
ipv4 = get_own_ipv4(now)
ipv6 = get_own_ipv6(now)
# Get API Key locally
api_key = read_line_from_file('/root/.hcloud_dns_internal', now)
# Get Firewall ID
firewall_id = read_line_from_file('/root/.firewall_id', now)
# Read previous IP
old_ipv4 = read_line_from_file("/root/.ipv4_check", now)
old_ipv6 = read_line_from_file("/root/.ipv6_check", now)
# Write back current IP
write_line_to_file("/root/.ipv4_check", ipv4, now)
write_line_to_file("/root/.ipv6_check", ipv6, now)
# Check if old and new are same
if old_ipv4 == ipv4 and old_ipv6 == ipv6:
if force_update:
logit("Both IP are still the same, but update was forced", now)
else:
logit("Both IP are still the same, not updating.", now)
exit(0)
else:
logit("One or more IP are not the same, updating them", now)
try:
api_client = Client(api_key)
firewall = api_client.firewalls.get_by_id(firewall_id)
my_ip = [ipv4, ipv6]
rules = [
# DNS from Home
FirewallRule(direction="in", port='53', protocol="tcp", source_ips=my_ip, description="DNS TCP"),
FirewallRule(direction="in", port='53', protocol="udp", source_ips=my_ip, description="DNS UDP"),
# SSH from Home
FirewallRule(direction="in", port='1235', protocol="tcp", source_ips=my_ip, description="Home SSH"),
# ICMP from Home
FirewallRule(direction="in", protocol="icmp", source_ips=my_ip, description="Home ICMP")
]
api_client.firewalls.set_rules(firewall=firewall, rules=rules)
logit(f"Done. Set {ipv4} and {ipv6} in firewall!", now)
except APIException as e:
logit(f"Api Exception: {e.message}", now)
exit(1)
if __name__ == "__main__":
parser = ArgumentParser("refresh_firewall_cloud")
parser.add_argument('--force-update', dest='is_forced', action='store_true')
args = parser.parse_args()
time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
main(time_now, args.is_forced)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment