Last active
April 25, 2025 19:03
-
-
Save domeniko-gentner/2d0f6ed224ad088d8361a032524a6259 to your computer and use it in GitHub Desktop.
HCloud: Refresh Firewall with Home IP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from hcloud import Client | |
| from hcloud.firewalls.domain import FirewallRule | |
| from requests import get as r_get, HTTPError, Timeout | |
| from ipaddress import ip_network | |
| from systemd.journal import send | |
| from datetime import datetime | |
| from hcloud.hcloud import APIException | |
| from argparse import ArgumentParser | |
| def logit(message: str, now: str) -> None: | |
| print(f"refresh_firewall_cloud {now} {message}") | |
| send(f"refresh_firewall_cloud {now} {message}") | |
| def read_line_from_file(filename: str, now: str) -> str: | |
| try: | |
| with open(filename, mode='r') as f: | |
| data = f.readline().strip() | |
| return data | |
| except IOError as error: | |
| logit(f"IOError while reading {filename}: {error.message}", now) | |
| exit(1) | |
| def write_line_to_file(filename: str, data: str, now:str) -> None: | |
| try: | |
| with open(filename, mode="w") as f: | |
| f.write(data) | |
| except IOError as error: | |
| logit(f"IOError while writing {filename}: {error.message}", now) | |
| exit(1) | |
| def get_own_ipv4(now: str) -> str: | |
| try: | |
| ipv4_result = r_get('https://api.ipify.org') | |
| except HTTPError as error: | |
| logit(f"Received a HTTP Error while getting your IP: {error}", now) | |
| exit(1) | |
| except ConnectionError as error: | |
| logit(f"Received a Connection Error while getting your IP: {error}", now) | |
| exit(1) | |
| except Timeout as error: | |
| logit(f"Received a Timeout Error while getting your IP: {error}", now) | |
| exit(1) | |
| if ipv4_result.status_code == 200: | |
| return str(f"{ipv4_result.text.strip()}/32") | |
| else: | |
| logit(f"HTTP Code was not 200, it was {ipv4_result.status_code}", now) | |
| exit(1) | |
| def get_own_ipv6(now: str): | |
| try: | |
| ipv6_result = r_get('https://api6.ipify.org') | |
| except HTTPError as error: | |
| logit(f"Received a HTTP Error while getting your IP: {error}", now) | |
| exit(1) | |
| except ConnectionError as error: | |
| logit(f"Received a Connection Error while getting your IP: {error}", now) | |
| exit(1) | |
| except Timeout as error: | |
| logit(f"Received a Timeout Error while getting your IP: {error}", now) | |
| exit(1) | |
| if ipv6_result.status_code == 200: | |
| network = ip_network(f"{ipv6_result.text.strip()}/64", strict=False) | |
| return str(f"{network.network_address}/64") | |
| else: | |
| logit(f"HTTP Code was not 200, it was {ipv6_result.status_code}", now) | |
| exit(1) | |
| def main(now: str, force_update: bool): | |
| # Get own IP from Internet | |
| ipv4 = get_own_ipv4(now) | |
| ipv6 = get_own_ipv6(now) | |
| # Get API Key locally | |
| api_key = read_line_from_file('/root/.hcloud_dns_internal', now) | |
| # Get Firewall ID | |
| firewall_id = read_line_from_file('/root/.firewall_id', now) | |
| # Read previous IP | |
| old_ipv4 = read_line_from_file("/root/.ipv4_check", now) | |
| old_ipv6 = read_line_from_file("/root/.ipv6_check", now) | |
| # Write back current IP | |
| write_line_to_file("/root/.ipv4_check", ipv4, now) | |
| write_line_to_file("/root/.ipv6_check", ipv6, now) | |
| # Check if old and new are same | |
| if old_ipv4 == ipv4 and old_ipv6 == ipv6: | |
| if force_update: | |
| logit("Both IP are still the same, but update was forced", now) | |
| else: | |
| logit("Both IP are still the same, not updating.", now) | |
| exit(0) | |
| else: | |
| logit("One or more IP are not the same, updating them", now) | |
| try: | |
| api_client = Client(api_key) | |
| firewall = api_client.firewalls.get_by_id(firewall_id) | |
| my_ip = [ipv4, ipv6] | |
| rules = [ | |
| # DNS from Home | |
| FirewallRule(direction="in", port='53', protocol="tcp", source_ips=my_ip, description="DNS TCP"), | |
| FirewallRule(direction="in", port='53', protocol="udp", source_ips=my_ip, description="DNS UDP"), | |
| # SSH from Home | |
| FirewallRule(direction="in", port='1235', protocol="tcp", source_ips=my_ip, description="Home SSH"), | |
| # ICMP from Home | |
| FirewallRule(direction="in", protocol="icmp", source_ips=my_ip, description="Home ICMP") | |
| ] | |
| api_client.firewalls.set_rules(firewall=firewall, rules=rules) | |
| logit(f"Done. Set {ipv4} and {ipv6} in firewall!", now) | |
| except APIException as e: | |
| logit(f"Api Exception: {e.message}", now) | |
| exit(1) | |
| if __name__ == "__main__": | |
| parser = ArgumentParser("refresh_firewall_cloud") | |
| parser.add_argument('--force-update', dest='is_forced', action='store_true') | |
| args = parser.parse_args() | |
| time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| main(time_now, args.is_forced) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment