Created
September 26, 2020 13:01
-
-
Save Jim-Holmstroem/e1304ba51da90082716d4bf640eb3acf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import time | |
import schedule | |
import json | |
import requests | |
def start_updater(): | |
def _f(): | |
while True: | |
schedule.run_pending() | |
time.sleep(0.1) | |
threading.Thread(target=_f, args=()).start() | |
start_updater() | |
class Value: | |
def __init__(self, name, endpoint): | |
self.name = name | |
self.endpoint = endpoint | |
self.filename = f"{name}.json" | |
try: | |
with open(self.filename) as f: | |
self.data = json.load(f) | |
print(f"initial load from file") | |
except Exception as e: | |
print(f"initial file load failed") | |
self.data = {} | |
self.rlock = threading.RLock() | |
def start(self): | |
schedule.every(1).seconds.do(self.update) | |
def update(self): | |
def _update(): | |
with self.rlock: | |
try: | |
data = requests.get( | |
self.endpoint, | |
timeout=1 | |
).json() | |
with open(f"{self.name}.json", "w") as f: | |
json.dump(data, f) | |
self.data = data | |
print(f"{self.name}: update [{time.ctime()}]") | |
except Exception as e: | |
print(e) | |
print(f"{self.name}: failed read from server") | |
threading.Thread(target=_update, args=()).start() | |
def read(self): | |
print(f"{self.name}: length={len(self.data)}") | |
def __str__(self): | |
return f"Value[{self.name}]({self.endpoint})" | |
health = Value("Health", "https://api.github.com/orgs/aiwizo/repos") | |
health.start() | |
status = Value("Status", "https://api.github.com/orgs/aiwizo/repos") | |
status.start() | |
print(health) | |
print(status) | |
while True: | |
time.sleep(3) | |
health.read() | |
status.read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment