Created
March 27, 2020 16:03
-
-
Save rosterloh/c73374ae144ee7402d5b9c3f7215acc0 to your computer and use it in GitHub Desktop.
COVID-19 tracker on a PiminiTFT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=COVID-19 Tracker Service | |
After=multi-user.target | |
[Service] | |
User=pi | |
Type=simple | |
WorkingDirectory=/home/pi/CovidTracker | |
ExecStart=/usr/bin/python3 /home/pi/CovidTracker/main.py --update 3600 | |
Restart=on-failure | |
[Install] | |
WantedBy=multi-user.target |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import asyncio | |
import board | |
import datetime as dt | |
import digitalio | |
import json | |
import logging | |
import requests | |
from systemd import journal | |
from adafruit_rgb_display.rgb import color565 | |
import adafruit_rgb_display.st7789 as st7789 | |
from PIL import Image, ImageDraw, ImageFont | |
from stats import CovidStats | |
#COVID_19_API = "https://covid19api.herokuapp.com/" | |
# https://github.com/ExpDev07/coronavirus-tracker-api | |
COVID_19_API = "https://coronavirus-tracker-api.herokuapp.com/v2/" | |
HEADER = "COVID-19" | |
class CovidTracker: | |
def __init__(self, refresh_rate=600, width=240, height=240): | |
self.get_stats_interval_sec = refresh_rate | |
self.width = width | |
self.height = height | |
self.logger = logging.getLogger(__name__) | |
self.logger.propagate = False | |
self.logger.addHandler(journal.JournaldLogHandler()) | |
self.logger.setLevel(logging.INFO) | |
cs_pin = digitalio.DigitalInOut(board.CE0) | |
dc_pin = digitalio.DigitalInOut(board.D25) | |
reset_pin = None | |
BAUDRATE = 64000000 | |
self.display = st7789.ST7789( | |
board.SPI(), | |
cs=cs_pin, | |
dc=dc_pin, | |
rst=reset_pin, | |
baudrate=BAUDRATE, | |
width=width, | |
height=height, | |
y_offset=90, | |
rotation=180 | |
) | |
backlight = digitalio.DigitalInOut(board.D22) | |
backlight.switch_to_output() | |
backlight.value = True | |
self.buttonA = digitalio.DigitalInOut(board.D23) | |
self.buttonB = digitalio.DigitalInOut(board.D24) | |
self.buttonA.switch_to_input() | |
self.buttonB.switch_to_input() | |
self._button_hold_time = 2.0 | |
self.image = Image.new("RGB", (width, height)) | |
self.draw = ImageDraw.Draw(self.image) | |
self.draw.rectangle((0, 0, width, height), outline=0, fill=(0, 0, 0)) | |
self.display.image(self.image)#, rotation) | |
self.font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 24) | |
self.stats = CovidStats() | |
loop = asyncio.get_event_loop() | |
loop.create_task(self.update_loop()) | |
# loop.create_task(self.button_loop()) | |
def update_display(self): | |
if self.stats.new_data: | |
self.logger.info('New data at ' + self.stats.last_updated.strftime('%d-%b-%Y (%H:%M)')) | |
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) | |
hoffest = (240 - self.font.getsize(HEADER)[0]) / 2 | |
cases = self.stats.confirmed | |
deaths = self.stats.deaths | |
padding = 10 | |
x = 0 | |
y = padding | |
self.draw.text((hoffest, y), HEADER, font=self.font, fill="#FFFFFF") | |
y += self.font.getsize(HEADER)[1]+padding | |
self.draw.text((x, y), cases, font=self.font, fill="#FF0000") | |
y += self.font.getsize(cases)[1]*3 + padding | |
self.draw.text((x, y), deaths, font=self.font, fill="#FF0000") | |
self.display.image(self.image)#, rotation) | |
async def get_stats_from_server(self) -> None: | |
self.logger.debug("Fetching update") | |
r = requests.get(COVID_19_API + 'latest') | |
if r.status_code == 200: | |
data = json.loads(r.content) | |
confirmed_world = int(data["latest"]["confirmed"]) | |
deaths_world = int(data["latest"]["deaths"]) | |
recovered_world = int(data["latest"]["recovered"]) | |
r = requests.get(COVID_19_API + 'locations/223') | |
if r.status_code == 200: | |
data = json.loads(r.content) | |
last_update = dt.datetime.strptime(data["location"]["last_updated"], '%Y-%m-%dT%H:%M:%S.%fZ') | |
confirmed_uk = int(data["location"]["latest"]["confirmed"]) | |
confirmed_uk_timeline = data["location"]["timelines"]["confirmed"] | |
deaths_uk = int(data["location"]["latest"]["deaths"]) | |
deaths_uk_timeline = data["location"]["timelines"]["deaths"] | |
recovered_uk = int(data["location"]["latest"]["recovered"]) | |
r = requests.get(COVID_19_API + 'locations/200') #?country_code=ZA;timelines=1') | |
if r.status_code == 200: | |
data = json.loads(r.content) | |
confirmed_sa = int(data["location"]["latest"]["confirmed"]) | |
confirmed_sa_timeline = data["location"]["timelines"]["confirmed"] | |
deaths_sa = int(data["location"]["latest"]["deaths"]) | |
deaths_sa_timeline = data["location"]["timelines"]["deaths"] | |
recovered_sa = int(data["location"]["latest"]["recovered"]) | |
self.stats.update(confirmed_world, confirmed_uk, confirmed_sa, deaths_world, deaths_uk, deaths_sa, last_update) | |
self.update_display() | |
self.logger.debug("Confirmed: {0} UK: {1} SA {2}".format(confirmed_world, confirmed_uk, confirmed_sa)) | |
self.logger.debug("Deaths: {0} UK: {1} SA {2}".format(deaths_world, deaths_uk, deaths_sa)) | |
else: | |
self.logger.error("Error refreshing data") | |
async def update_loop(self) -> None: | |
while True: | |
await self.get_stats_from_server() | |
await asyncio.sleep(self.get_stats_interval_sec) | |
async def button_loop(self) -> None: | |
last_a = True | |
last_b = True | |
while True: | |
if last_a and not self.buttonA.value: | |
self._t_a_pressed = time.time() | |
self._hold_a_fired = False | |
self._t_a_repeat = time.time() | |
Thread(target=self._button_press_handler).start() | |
if not last_a and self.buttonA.value: | |
Thread(target=self._button_release_handler, args=(self._hold_a_fired,)).start() | |
if not self.buttonA.value: | |
if not self._hold_a_fired and (time.time() - self._t_a_pressed) > self._button_hold_time: | |
Thread(target=self._button_hold_handler).start() | |
self._hold_a_fired = True | |
last_a = self.buttonA.value | |
await asyncio.sleep(0.05) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--update', type=float, default=600, help='Rate at which to update the data') | |
args = parser.parse_args() | |
try: | |
ct = CovidTracker(refresh_rate=args.update) | |
loop = asyncio.get_event_loop() | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime as dt | |
class CovidStats: | |
def __init__(self): | |
self.last_updated = dt.datetime.now() | |
self.last_cases = 0 | |
self.last_uk_cases = 0 | |
self.last_sa_cases = 0 | |
self.last_deaths = 0 | |
self.last_uk_deaths = 0 | |
self.last_sa_deaths = 0 | |
self.new_cases = None | |
self.new_uk_cases = None | |
self.new_sa_cases = None | |
self.new_deaths = None | |
self.new_uk_deaths = None | |
self.new_sa_deaths = None | |
self.updated = True | |
def update(self, cw, cuk, csa, dw, duk, dsa, update): | |
if self.last_updated != update: | |
self.last_updated = update | |
self.updated = True | |
if cw != self.last_cases: | |
if self.new_cases is not None: | |
self.new_cases = cw - self.last_cases | |
else: | |
self.new_cases = 0 | |
self.last_cases = cw | |
if cuk != self.last_uk_cases: | |
if self.new_uk_cases is not None: | |
self.new_uk_cases = cuk - self.last_uk_cases | |
else: | |
self.new_uk_cases = 0 | |
self.last_uk_cases = cuk | |
if csa != self.last_sa_cases: | |
if self.new_sa_cases is not None: | |
self.new_sa_cases = csa - self.last_sa_cases | |
else: | |
self.new_sa_cases = 0 | |
self.last_sa_cases = csa | |
if dw != self.last_deaths: | |
if self.new_deaths is not None: | |
self.new_deaths = dw - self.last_deaths | |
else: | |
self.new_deaths = 0 | |
self.last_deaths = dw | |
if duk != self.last_uk_deaths: | |
if self.new_uk_deaths is not None: | |
self.new_uk_deaths = duk - self.last_uk_deaths | |
else: | |
self.new_uk_deaths = 0 | |
self.last_uk_deaths = duk | |
if dsa != self.last_sa_deaths: | |
if self.new_sa_deaths is not None: | |
self.new_sa_deaths = dsa - self.last_sa_deaths | |
else: | |
self.new_sa_deaths = 0 | |
self.last_sa_deaths = dsa | |
def new_data(self): | |
if self.updated: | |
self.updated = False | |
return True | |
else: | |
return False | |
@property | |
def confirmed(self): | |
return "Confirmed: {0}\n UK: {1} ({2})\n SA: {3} ({4})".format( | |
self.last_cases, self.last_uk_cases, self.new_uk_cases, | |
self.last_sa_cases, self.new_sa_cases | |
) | |
@property | |
def deaths(self): | |
return "Deaths: {0}\n UK: {1} ({2})\n SA: {3} ({4})".format( | |
self.last_deaths, self.last_uk_deaths, self.new_uk_deaths, | |
self.last_sa_deaths, self.new_sa_deaths | |
) | |
def __repr__(self): | |
fmt = """ | |
Confirmed Cases: | |
Total: {cw:d} | |
UK: {cuk:d} | |
SA: {csa:d} | |
Deaths: | |
Total: {dw:d} | |
UK: {duk:d} | |
SA: {dw:d} | |
""" | |
return fmt.format( | |
cw=self.last_cases, | |
cuk=self.last_uk_cases, | |
csa=self.last_sa_cases, | |
dw=self.last_deaths, | |
duk=self.last_uk_deaths, | |
dsa=self.last_sa_deaths | |
) | |
__str__ = __repr__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment