Skip to content

Instantly share code, notes, and snippets.

@Jim-Holmstroem
Created September 26, 2020 13:01
Show Gist options
  • Save Jim-Holmstroem/e1304ba51da90082716d4bf640eb3acf to your computer and use it in GitHub Desktop.
Save Jim-Holmstroem/e1304ba51da90082716d4bf640eb3acf to your computer and use it in GitHub Desktop.
import threading
import time
import schedule
import json
import requests
def start_updater():
def _f():
while True:
schedule.run_pending()
time.sleep(0.1)
threading.Thread(target=_f, args=()).start()
start_updater()
class Value:
def __init__(self, name, endpoint):
self.name = name
self.endpoint = endpoint
self.filename = f"{name}.json"
try:
with open(self.filename) as f:
self.data = json.load(f)
print(f"initial load from file")
except Exception as e:
print(f"initial file load failed")
self.data = {}
self.rlock = threading.RLock()
def start(self):
schedule.every(1).seconds.do(self.update)
def update(self):
def _update():
with self.rlock:
try:
data = requests.get(
self.endpoint,
timeout=1
).json()
with open(f"{self.name}.json", "w") as f:
json.dump(data, f)
self.data = data
print(f"{self.name}: update [{time.ctime()}]")
except Exception as e:
print(e)
print(f"{self.name}: failed read from server")
threading.Thread(target=_update, args=()).start()
def read(self):
print(f"{self.name}: length={len(self.data)}")
def __str__(self):
return f"Value[{self.name}]({self.endpoint})"
health = Value("Health", "https://api.github.com/orgs/aiwizo/repos")
health.start()
status = Value("Status", "https://api.github.com/orgs/aiwizo/repos")
status.start()
print(health)
print(status)
while True:
time.sleep(3)
health.read()
status.read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment