Last active
January 13, 2024 22:05
-
-
Save thewh1teagle/f408d2f44cafa7dac438779b012087d5 to your computer and use it in GitHub Desktop.
Detect when internet back to be stable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import subprocess | |
| import re | |
| import time | |
| import requests | |
| import logging | |
| import os | |
| logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO').upper()) | |
| logger = logging.getLogger() | |
| MIN_PING_TIME=24 # 13ms | |
| SUCCESS_TIMES=20 # 30 times | |
| WAIT_BETWEEN=1 # 1s | |
| # windows in ms, posix in seconds | |
| PING_TIMEOUT=1000 # 1000ms | |
| HOST='1.1.1.1' | |
| NTFY_URL='https://ntfy.sh/07sQofpjduKx3kyL' | |
| def after_success(): | |
| exit(0) | |
| def on_success(): | |
| logger.info(f'Sending success message to {NTFY_URL}') | |
| res = requests.post(NTFY_URL, f'Finally we have internet! we got {SUCCESS_TIMES} ping lower than {MIN_PING_TIME}') | |
| logger.info(f'Sending response is {res.status_code}') | |
| after_success() | |
| def ping(host, timeout_ms, count = 1): | |
| cmd = ['ping'] | |
| # Set timeout | |
| if timeout_ms: | |
| if os.name == 'nt': | |
| cmd.extend(['-w', str(timeout_ms)]) | |
| else: | |
| # in posix timeout in second | |
| logger.debug('Posix env detected. using seconds timeout') | |
| timeout_seconds = int(timeout_ms / 1000) | |
| cmd.extend(['-w', str(timeout_seconds)]) | |
| # Set count | |
| if os.name == 'nt': | |
| cmd.extend(['-n', str(count)]) | |
| else: | |
| cmd.extend(['-c', str(count)]) | |
| cmd.append(host) | |
| # Spawn | |
| # logger.debug(f'Spawn {cmd}') | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| proc.wait() | |
| ret = proc.returncode | |
| # logger.debug(f'Spawn ret: {ret}') | |
| # Handle failed | |
| if ret != 0: | |
| stderr = proc.stderr.read().decode('utf-8') | |
| logger.debug(f'Spawn ret and stderr: {ret} {stderr}') | |
| return | |
| stdout = proc.stdout.read().decode('utf-8') | |
| # logger.debug(f'STDOUT:\n{stdout}') | |
| # Parse result | |
| pattern = re.compile( | |
| 'Reply from ([.|0-9]+): bytes=([0-9]+) time=([0-9]+)ms TTL=([0-9]+)' | |
| if os.name == 'nt' else | |
| '([0-9]+) bytes from ([.|0-9]+): icmp_seq=[0-9]+ ttl=([0-9]+) time=([0-9]+).+ ms' | |
| ) | |
| match = pattern.search(stdout) | |
| # logger.debug(f'Match is {match}') | |
| # Handle failed | |
| if not match or len(match.groups()) != 4: | |
| logger.error(f'ping of {host} failed! stdout: {stdout}') | |
| return | |
| # Get regex result | |
| if os.name == 'nt': | |
| host, duration, size, ttl = match.groups() | |
| else: | |
| size, host, ttl, duration = match.groups() | |
| logger.debug(f'ping of {host} returned in {duration}ms!') | |
| return host, float(duration), float(size), float(ttl) | |
| def main(): | |
| success = 0 | |
| while True: | |
| logger.debug(f'Success {success} times!') | |
| try: | |
| result = ping(HOST, PING_TIMEOUT) | |
| except Exception as e: | |
| logger.error(e) | |
| continue | |
| if result: | |
| _, duration, _, _ = result | |
| if duration <= MIN_PING_TIME: | |
| success += 1 | |
| else: | |
| success = 0 | |
| else: | |
| success = 0 | |
| if success >= SUCCESS_TIMES: | |
| on_success() | |
| logger.debug(f'SLeeping {WAIT_BETWEEN} seconds') | |
| time.sleep(WAIT_BETWEEN) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment