Skip to content

Instantly share code, notes, and snippets.

@thewh1teagle
Last active January 13, 2024 22:05
Show Gist options
  • Select an option

  • Save thewh1teagle/f408d2f44cafa7dac438779b012087d5 to your computer and use it in GitHub Desktop.

Select an option

Save thewh1teagle/f408d2f44cafa7dac438779b012087d5 to your computer and use it in GitHub Desktop.
Detect when internet back to be stable
import subprocess
import re
import time
import requests
import logging
import os
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO').upper())
logger = logging.getLogger()
MIN_PING_TIME=24 # 13ms
SUCCESS_TIMES=20 # 30 times
WAIT_BETWEEN=1 # 1s
# windows in ms, posix in seconds
PING_TIMEOUT=1000 # 1000ms
HOST='1.1.1.1'
NTFY_URL='https://ntfy.sh/07sQofpjduKx3kyL'
def after_success():
exit(0)
def on_success():
logger.info(f'Sending success message to {NTFY_URL}')
res = requests.post(NTFY_URL, f'Finally we have internet! we got {SUCCESS_TIMES} ping lower than {MIN_PING_TIME}')
logger.info(f'Sending response is {res.status_code}')
after_success()
def ping(host, timeout_ms, count = 1):
cmd = ['ping']
# Set timeout
if timeout_ms:
if os.name == 'nt':
cmd.extend(['-w', str(timeout_ms)])
else:
# in posix timeout in second
logger.debug('Posix env detected. using seconds timeout')
timeout_seconds = int(timeout_ms / 1000)
cmd.extend(['-w', str(timeout_seconds)])
# Set count
if os.name == 'nt':
cmd.extend(['-n', str(count)])
else:
cmd.extend(['-c', str(count)])
cmd.append(host)
# Spawn
# logger.debug(f'Spawn {cmd}')
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
ret = proc.returncode
# logger.debug(f'Spawn ret: {ret}')
# Handle failed
if ret != 0:
stderr = proc.stderr.read().decode('utf-8')
logger.debug(f'Spawn ret and stderr: {ret} {stderr}')
return
stdout = proc.stdout.read().decode('utf-8')
# logger.debug(f'STDOUT:\n{stdout}')
# Parse result
pattern = re.compile(
'Reply from ([.|0-9]+): bytes=([0-9]+) time=([0-9]+)ms TTL=([0-9]+)'
if os.name == 'nt' else
'([0-9]+) bytes from ([.|0-9]+): icmp_seq=[0-9]+ ttl=([0-9]+) time=([0-9]+).+ ms'
)
match = pattern.search(stdout)
# logger.debug(f'Match is {match}')
# Handle failed
if not match or len(match.groups()) != 4:
logger.error(f'ping of {host} failed! stdout: {stdout}')
return
# Get regex result
if os.name == 'nt':
host, duration, size, ttl = match.groups()
else:
size, host, ttl, duration = match.groups()
logger.debug(f'ping of {host} returned in {duration}ms!')
return host, float(duration), float(size), float(ttl)
def main():
success = 0
while True:
logger.debug(f'Success {success} times!')
try:
result = ping(HOST, PING_TIMEOUT)
except Exception as e:
logger.error(e)
continue
if result:
_, duration, _, _ = result
if duration <= MIN_PING_TIME:
success += 1
else:
success = 0
else:
success = 0
if success >= SUCCESS_TIMES:
on_success()
logger.debug(f'SLeeping {WAIT_BETWEEN} seconds')
time.sleep(WAIT_BETWEEN)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment