Last active
April 6, 2022 03:05
-
-
Save danielealbano/32a391b41cbffb447781989b6ea4b32e to your computer and use it in GitHub Desktop.
rpi4-ssd1306-status-display.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# RPI4 SSD1306 Status Display - v0.2 - 2020/12/04 - 11.46 PM GMT | |
# Requires a 128x32 I2C SSD1306 OLED display | |
# | |
# Author Daniele Albano <https://github.com/danielealbano> | |
# | |
# It displays a number of status information for the RPI4 | |
# - Memory usage | |
# - Cpu load (per CPU) | |
# - Temperature | |
# - Network traffic (no loopback) | |
# - Network ip (only ipv4, one screen per interface and ip, no loopback) | |
# - Hostname | |
# - Current date and time | |
# | |
# Some of these status page are graphical and stacked together (the order is hardcoded but is a simple array) some | |
# others instead require more space and are displayed one by one. | |
# It's possible to see an example here | |
# https://pasteboard.co/JDr0K2t.png | |
# | |
# | |
# To run it | |
# sudo python3 rpi4-ssd1306-status-display.py | |
# | |
# Tested on ubuntu 20.10 64bit | |
# | |
# requirements.txt: | |
# adafruit-circuitpython-ssd1306 | |
# | |
# ubuntu packages | |
# python3-pip | |
# python3-pil | |
# python3-smbus | |
# python3-rpi.gpio | |
# python3-psutil | |
# i2c-tools | |
# fonts-freefont-ttf (optional) | |
# | |
# ### Changelog | |
# * 2020/12/04 - 11.46 PM GMT - v0.2 | |
# Refactored AbstractStatusPage to support a generic render method that returns a framebuffer to render instead of using the label/value text combination | |
# Implemented a AbstractSimpleLabelValueStatusPage to render simple status pages with just a label and a value | |
# Implemented a AbstractCachedFrameBufferStatusPage to pre-cache a framebuffer and empty it when rendering instead to re-initialize everything everytime | |
# Implemented a AbstractVerticalBarsWithLabelStatusPage to render set of vertical bars with values between 0 and 1, it also renders a label on the top | |
# Converted a number of status pages to use AbstractVerticalBarsWithLabelStatusPage | |
# Implemented the DisksSpaceUsageStatusPage status page (the label is shorten to simply "DU" to reduce space usage) | |
# Implemented a HorizontalLayoutStatusPage to be able to horizontally stack status pages | |
# Implemented the decorator status_page to mark a class extending AbstractStatusPage as an actual status page that can be rendered (deprecated, will be removed) | |
# Hardcoded in the Display class and in the HorizontalLayoutStatusPage the current layout (will be refactored later to take it from command line of configuration) | |
# Drammatically reduced CPU and Memory usage | |
# | |
# * 2020/12/03 11.14 PM GMT - v0.1 | |
# First release (tracked) | |
# | |
from abc import ABC, abstractmethod | |
from PIL import Image, ImageDraw, ImageFont | |
from typing import List, Optional | |
import math | |
import datetime | |
import sys | |
import inspect | |
import time | |
import psutil | |
import os | |
import netifaces as ni | |
import board | |
import busio | |
import adafruit_ssd1306 | |
def status_page(clsobj): | |
clsobj.__is_status_page__ = True | |
return clsobj | |
class NetworkInfoTrait: | |
@classmethod | |
def _enumerate_interfaces(cls): | |
return list(filter( | |
lambda interface: interface != 'lo' and (ni.AF_INET in ni.ifaddresses(interface)), | |
ni.interfaces() | |
)) | |
@classmethod | |
def _enumerate_interfaces_with_ipv4_addresses(cls): | |
ifs: List[dict] = [] | |
for interface in ni.interfaces(): | |
iface_addresses = ni.ifaddresses(interface) | |
if interface == 'lo' or ni.AF_INET not in iface_addresses: | |
continue | |
for index, ip_config in enumerate(iface_addresses[ni.AF_INET]): | |
try: | |
ifs.append({ | |
"index": index, | |
"name": interface, | |
"ip": ip_config['addr'] | |
}) | |
except: | |
pass | |
return ifs | |
@classmethod | |
def _get_network_stats(cls): | |
stats_time_ms: float = time.time_ns() // 1000000 | |
# https://stackoverflow.com/questions/1052589/how-can-i-parse-the-output-of-proc-net-dev-into-keyvalue-pairs-per-interface-u | |
lines = open("/proc/net/dev", "r").readlines() | |
columns_line = lines[1] | |
_, receive_columns, transmit_columns = columns_line.split("|") | |
receive_columns = list(map(lambda a: "rx_" + a, receive_columns.split())) | |
transmit_columns = list(map(lambda a: "tx_" + a, transmit_columns.split())) | |
cols = receive_columns + transmit_columns | |
stats_ifs = {} | |
for line in lines[2:]: | |
if line.find(":") < 0: | |
continue | |
stats_if, stats_if_data = line.split(":") | |
stats_if_name = stats_if.strip() | |
stats_ifs[stats_if_name] = dict(zip(cols, stats_if_data.split())) | |
# if there is no active link the speed is not provided, otherwise speed in mbit | |
try: | |
speed_lines = open("/sys/class/net/{}/speed".format(stats_if_name)).readlines() | |
speed: float = float(speed_lines[0]) | |
except: | |
speed: float = 0 | |
stats_ifs[stats_if_name]['speed'] = speed | |
return stats_ifs, stats_time_ms | |
def _calculate_rx_tx_kbits(self, stats_if: dict, stats_time_ms) -> dict: | |
value: dict = dict({ | |
"speed": 0, | |
"rx": 0, | |
"tx": 0, | |
}) | |
# max speed in kbit | |
value["speed"] = stats_if["speed"] * 1000 | |
if self._last_stats_if is not None: | |
diff_time_ms_perc = (stats_time_ms - self._last_stats_time_ms) / 1000 | |
# Calculate the Bps and convert them to Kbps by default | |
diff_rx_bytes = ((int(stats_if['rx_bytes']) - int(self._last_stats_if['rx_bytes'])) * 8) / 1000 | |
diff_tx_bytes = ((int(stats_if['tx_bytes']) - int(self._last_stats_if['tx_bytes'])) * 8) / 1000 | |
for diff_info in [('rx', diff_rx_bytes), ('tx', diff_tx_bytes)]: | |
diff_bytes = diff_info[1] | |
diff_bytes /= diff_time_ms_perc | |
value[diff_info[0]] = diff_bytes | |
self._last_stats_if = stats_if | |
self._last_stats_time_ms = stats_time_ms | |
return value | |
class DisplayFonts: | |
def __init__(self, normal: ImageFont, default: ImageFont): | |
self.normal = normal | |
self.default = default | |
class AbstractStatusPage(ABC): | |
_framebuffer_height = 32 | |
@classmethod | |
def new(cls): | |
return cls() | |
@abstractmethod | |
def render(self, fonts: DisplayFonts) -> Image: | |
raise NotImplementedError | |
class AbstractCachedFrameBufferStatusPage(AbstractStatusPage): | |
def __init__(self): | |
super().__init__() | |
self.__simple_label_value_status_page_cache: Optional[dict] = None | |
@abstractmethod | |
def _fb_size(self, fonts: DisplayFonts) -> tuple: | |
raise NotImplementedError | |
@abstractmethod | |
def render_cached_fb(self, | |
fonts: DisplayFonts, | |
framebuffer: Image, | |
framebuffer_size: tuple, | |
framebuffer_draw: ImageDraw) -> Image: | |
raise NotImplementedError | |
def render(self, fonts: DisplayFonts) -> Image: | |
if self.__simple_label_value_status_page_cache is None: | |
self.__simple_label_value_status_page_cache = { | |
'fb_size': self._fb_size(fonts), | |
'framebuffer': Image.new("1", self._fb_size(fonts)), | |
} | |
self.__simple_label_value_status_page_cache['framebuffer_draw'] = \ | |
ImageDraw.Draw(self.__simple_label_value_status_page_cache['framebuffer']) | |
fb_size: tuple = self.__simple_label_value_status_page_cache['fb_size'] | |
framebuffer: Image = self.__simple_label_value_status_page_cache['framebuffer'] | |
framebuffer_draw: ImageDraw = self.__simple_label_value_status_page_cache['framebuffer_draw'] | |
return self.render_cached_fb( | |
fonts=fonts, | |
framebuffer=framebuffer, | |
framebuffer_size=fb_size, | |
framebuffer_draw=framebuffer_draw) | |
class AbstractSimpleLabelValueStatusPage(AbstractCachedFrameBufferStatusPage): | |
@abstractmethod | |
def label(self) -> str: | |
raise NotImplementedError | |
@abstractmethod | |
def value(self) -> str: | |
raise NotImplementedError | |
def _fb_size(self, fonts: DisplayFonts) -> tuple: | |
return 128, self._framebuffer_height | |
def render_cached_fb(self, | |
fonts: DisplayFonts, | |
framebuffer: Image, | |
framebuffer_size: tuple, | |
framebuffer_draw: ImageDraw) -> Image: | |
framebuffer_draw.rectangle( | |
( | |
0, 0, | |
framebuffer.width, | |
framebuffer.height | |
), outline=0, fill=0) | |
h_offset: int = 0 | |
for line in [self.label(), self.value()]: | |
framebuffer_draw.text( | |
(0, h_offset), | |
line, | |
font=fonts.normal, | |
fill=255) | |
h_offset += fonts.normal.getsize(line)[1] + 2 | |
return framebuffer | |
class AbstractVerticalBarsWithLabelStatusPage(AbstractCachedFrameBufferStatusPage): | |
def __init__(self): | |
super().__init__() | |
self._label_text_size: tuple = (0, 0) | |
@property | |
@abstractmethod | |
def _label(self) -> str: | |
raise NotImplementedError | |
@property | |
@abstractmethod | |
def _bar_values(self) -> List[float]: | |
raise NotImplementedError | |
@property | |
@abstractmethod | |
def _bar_count(self) -> int: | |
raise NotImplementedError | |
@property | |
def _label_padding_bottom(self) -> int: | |
return 1 | |
@property | |
def _bar_width(self) -> int: | |
return 4 | |
@property | |
def _bar_padding_right(self) -> int: | |
return 2 | |
@property | |
def _bar_outline_size(self) -> int: | |
return 1 | |
def _fb_size(self, fonts: DisplayFonts) -> tuple: | |
self._label_text_size = fonts.default.getsize(self._label) | |
bar_width_with_padding: int = sum([ | |
self._bar_width, | |
self._bar_outline_size, # left outline width | |
self._bar_outline_size, # right outline width | |
self._bar_padding_right | |
]) | |
bars_total_width: int = (self._bar_count * bar_width_with_padding) - self._bar_padding_right | |
return max(self._label_text_size[0], bars_total_width), self._framebuffer_height | |
def render_cached_fb(self, | |
fonts: DisplayFonts, | |
framebuffer: Image, | |
framebuffer_size: tuple, | |
framebuffer_draw: ImageDraw) -> Image: | |
outline_size = self._bar_outline_size | |
outline_size_2 = 2 * outline_size | |
w_offset: int = outline_size | |
h_offset: int = self._label_text_size[1] + self._label_padding_bottom | |
h_available: int = framebuffer_size[1] - 1 - outline_size_2 - h_offset | |
h_bottom = framebuffer_size[1] - 1 - outline_size | |
for value in self._bar_values: | |
bar_height: int = math.ceil(h_available * value) | |
framebuffer_draw.rectangle( | |
( | |
w_offset, | |
h_offset + outline_size, | |
w_offset + self._bar_width, | |
h_bottom | |
), fill=0, outline=255, width=1) | |
framebuffer_draw.rectangle( | |
( | |
w_offset, | |
# no outline, decrease of the outline width as the position is calculated from the bottom | |
h_bottom - bar_height, | |
w_offset + self._bar_width, | |
# no outline, decrease of the outline width as the position is calculated from the bottom | |
h_bottom | |
), fill=255, outline=None) | |
w_offset += self._bar_width + self._bar_padding_right + outline_size_2 | |
framebuffer_draw.text( | |
(0, 0), | |
self._label, | |
font=fonts.default, | |
fill=255) | |
return framebuffer | |
@status_page | |
class DisksSpaceUsageStatusPage(AbstractVerticalBarsWithLabelStatusPage): | |
_exclude_filesystem_types: List[str] = [ | |
"sysfs", "proc", "devtmpfs", "devpts", "tmpfs", "securityfs", "cgroup", "cgroup2", "autofs", "mqueue", "debugfs", | |
"tracefs", "fusectl", "squashfs", "nsfs", "pstore", "configfs", "bpf" | |
] | |
_exclude_loop_devs = True | |
def __init__(self): | |
super().__init__() | |
self._mountpoints: List[dict] = None | |
def _enumerate_mountpoints(self) -> List[dict]: | |
mountpoints = open("/proc/mounts", "r").readlines() | |
mountpoints = map(lambda x: x.split(), mountpoints) | |
mountpoints = map(lambda x: { | |
"dev": x[0], | |
"path": x[1], | |
"fstype": x[2], | |
"options": x[3], | |
"backup": x[4], | |
"check_order": x[5] | |
}, mountpoints) | |
return list(mountpoints) | |
def _filter_mountpoints(self, mountpoints: List[dict]) -> List[dict]: | |
mountpoints = filter(lambda x: x['fstype'] not in self._exclude_filesystem_types, mountpoints) | |
if self._exclude_loop_devs: | |
mountpoints = filter(lambda x: not x['dev'].startswith('/dev/loop'), mountpoints) | |
return list(mountpoints) | |
def _get_mountpoints_stats(self, mountpoints: List[dict]): | |
for mountpoint in mountpoints: | |
mountpoint_path_statvfs = os.statvfs(mountpoint['path']) | |
mountpoint["free"] = float(mountpoint_path_statvfs.f_frsize * mountpoint_path_statvfs.f_bavail) | |
mountpoint["total"] = float(mountpoint_path_statvfs.f_frsize * mountpoint_path_statvfs.f_blocks) | |
return mountpoints | |
@property | |
def _label(self) -> str: | |
return "DU" | |
@property | |
def _bar_values(self) -> List[float]: | |
mountpoints: List[dict] = self._get_mountpoints_stats(mountpoints=self._mountpoints) | |
return list(map(lambda x: 1 - (x["free"] / x["total"]), mountpoints)) | |
@property | |
def _bar_count(self) -> int: | |
self._mountpoints = self._filter_mountpoints(mountpoints=self._enumerate_mountpoints()) | |
return len(self._mountpoints) | |
@status_page | |
class CpuLoadStatusPage(AbstractVerticalBarsWithLabelStatusPage): | |
def __init__(self): | |
super().__init__() | |
self._cpu_count = psutil.cpu_count() | |
@property | |
def _label(self) -> str: | |
return "CPU" | |
@property | |
def _bar_values(self) -> List[float]: | |
return list(map(lambda x: x / 100, psutil.cpu_percent(interval=0.5, percpu=True))) | |
@property | |
def _bar_count(self) -> int: | |
return self._cpu_count | |
@status_page | |
class MemoryUsageStatusPage(AbstractVerticalBarsWithLabelStatusPage): | |
@property | |
def _label(self) -> str: | |
return "MEM" | |
@property | |
def _bar_values(self) -> List[float]: | |
return [ | |
psutil.virtual_memory().percent / 100, | |
psutil.swap_memory().percent / 100 | |
] | |
@property | |
def _bar_count(self) -> int: | |
return 2 | |
@status_page | |
class TemperatureStatusPage(AbstractVerticalBarsWithLabelStatusPage): | |
@property | |
def _label(self) -> str: | |
return "C°" | |
@property | |
def _bar_values(self) -> List[float]: | |
temperature: float = 0 | |
temperature_str: str = "" | |
with open("/sys/class/thermal/thermal_zone0/temp") as f: | |
temperature_str = f.read() | |
if temperature_str: | |
temperature: float = float(temperature_str) / 1000 | |
# Display max 90 C degrees, because the PI will never be below 40 celsius degree, also offset the temperature | |
# to improve how the range is displayed | |
return [ | |
(temperature - 40) / 50 | |
] | |
@property | |
def _bar_count(self) -> int: | |
return 1 | |
@status_page | |
class NetworkTrafficStatusPage(AbstractVerticalBarsWithLabelStatusPage, NetworkInfoTrait): | |
@classmethod | |
def new(cls): | |
return [ | |
cls(interface) | |
for interface in cls._enumerate_interfaces() | |
] | |
def __init__(self, interface: str): | |
super().__init__() | |
self._interface = interface | |
self._last_stats_if: Optional[dict] = None | |
self._last_stats_time_ms: int = 0 | |
@property | |
def _label(self) -> str: | |
return self._interface | |
@property | |
def _bar_values(self) -> List[float]: | |
stats_ifs, stats_time_ms = self._get_network_stats() | |
stats_if = stats_ifs[self._interface] | |
rx_tx_kbits = self._calculate_rx_tx_kbits(stats_if, stats_time_ms) | |
# No reason to report anything if speed is 0 (aka no link) | |
if rx_tx_kbits['speed'] == 0: | |
return [0, 0] | |
return [ | |
rx_tx_kbits['rx'] / rx_tx_kbits['speed'], | |
rx_tx_kbits['tx'] / rx_tx_kbits['speed'] | |
] | |
@property | |
def _bar_count(self) -> int: | |
return 2 | |
@status_page | |
class NetworkIpStatusPage(AbstractSimpleLabelValueStatusPage, NetworkInfoTrait): | |
@classmethod | |
def new(cls): | |
return [ | |
cls(interface["name"], interface["index"], interface["ip"]) | |
for interface in cls._enumerate_interfaces_with_ipv4_addresses() | |
] | |
def __init__(self, interface: str, index: int, ip_address: str): | |
super().__init__() | |
self._if_name = interface | |
self._if_index = index | |
self._if_ipv4_addr = ip_address | |
def label(self) -> str: | |
if self._if_index == 0: | |
return "IP IPV4 ({})".format(self._if_name) | |
else: | |
return "IP IPV4 ({}:{})".format(self._if_name, self._if_index) | |
def value(self) -> str: | |
return self._if_ipv4_addr | |
@status_page | |
class HostnameIpStatusPage(AbstractSimpleLabelValueStatusPage): | |
def label(self) -> str: | |
return "HOSTNAME" | |
def value(self) -> str: | |
hostname: str = "" | |
with open("/etc/hostname") as f: | |
hostname = f.read() | |
if hostname: | |
return hostname | |
return "UNKNOWN" | |
@status_page | |
class DateTimeStatusPage(AbstractSimpleLabelValueStatusPage): | |
def label(self) -> str: | |
return "Date/Time" | |
def value(self): | |
return datetime.datetime.now().strftime("%x %X") | |
class HorizontalLayoutStatusPage(AbstractStatusPage): | |
padding_right: int = 6 | |
def __init__(self, status_pages_cls: list = []): | |
super().__init__() | |
self._fb_cache = None | |
self._status_pages_instances: List[AbstractStatusPage] = [] | |
status_page_classes: list = [ | |
CpuLoadStatusPage, | |
TemperatureStatusPage, | |
MemoryUsageStatusPage, | |
DisksSpaceUsageStatusPage, | |
NetworkTrafficStatusPage | |
] | |
status_page_instances: List[AbstractStatusPage] = [] | |
for status_page_class in status_page_classes: | |
new_instance = status_page_class.new() | |
if isinstance(new_instance, list): | |
status_page_instances.extend(new_instance) | |
else: | |
status_page_instances.append(new_instance) | |
self._status_pages_instances = status_page_instances | |
def render(self, fonts: DisplayFonts) -> Image: | |
status_pages_instances_fbs: List[Image] = [ | |
x.render(fonts) | |
for x in self._status_pages_instances | |
] | |
framebuffer_size: tuple = ( | |
sum([x.width + self.padding_right for x in status_pages_instances_fbs]) - self.padding_right, | |
max([x.height for x in status_pages_instances_fbs]) | |
) | |
framebuffer = Image.new("1", framebuffer_size) | |
w_offset = 0 | |
for status_pages_instance_fb in status_pages_instances_fbs: | |
framebuffer.paste(status_pages_instance_fb, (w_offset, 0)) | |
w_offset += status_pages_instance_fb.width + self.padding_right | |
return framebuffer | |
class Display: | |
def __init__(self, wait_time: int = 5): | |
self._wait_time: int = wait_time | |
self._oled_display: adafruit_ssd1306.SSD1306_I2C = None | |
self._framebuffer: Image = None | |
self._framebuffer_draw: ImageDraw = None | |
self._status_page_instantiate_first_time: bool = True | |
self._status_page_instances: List[AbstractStatusPage] = None | |
self._status_page_instantiate_time: float = 0 | |
self._init_fonts() | |
self._init_display() | |
self._init_framebuffer(width=self._oled_display.width, height=self._oled_display.height) | |
def _init_fonts(self): | |
font_normal: ImageFont = None | |
font_default: ImageFont = ImageFont.load_default() | |
try: | |
font_normal = ImageFont.truetype("FreeMonoBold", 12) | |
except Exception: | |
font_normal = ImageFont.load_default() | |
self._display_fonts = DisplayFonts( | |
normal=font_normal, | |
default=font_default | |
) | |
def _init_display(self): | |
i2c: busio.I2C = busio.I2C(board.SCL, board.SDA) | |
oled: adafruit_ssd1306.SSD1306_I2C = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c, addr=0x3c) | |
oled.fill(0) | |
oled.show() | |
self._oled_display = oled | |
def _init_framebuffer(self, width: int, height: int): | |
self._framebuffer = Image.new("1", (width, height)) | |
self._framebuffer_draw = ImageDraw.Draw(self._framebuffer) | |
def instantiate_status_pages(self) -> List[AbstractStatusPage]: | |
status_page_classes: list = [ | |
HorizontalLayoutStatusPage, | |
NetworkIpStatusPage, | |
HostnameIpStatusPage, | |
DateTimeStatusPage | |
] | |
status_page_instances: List[AbstractStatusPage] = [] | |
for status_page_class in status_page_classes: | |
new_instance = status_page_class.new() | |
if isinstance(new_instance, list): | |
status_page_instances.extend(new_instance) | |
else: | |
status_page_instances.append(new_instance) | |
self._status_page_instances: List[AbstractStatusPage] = status_page_instances | |
self._status_page_instantiate_time: float = time.time() | |
return self._status_page_instances | |
def get_current_status_page_index(self) -> int: | |
return int((time.time() - self._status_page_instantiate_time) / self._wait_time) % (len(self._status_page_instances)) | |
def _get_remaining_wait_time_current_status_page(self) -> float: | |
wait_time_ms: float = float(self._wait_time * 1000) | |
start_time_ms: float = self._status_page_instantiate_time * 1000 | |
current_time_ms: float = time.time() * 1000 | |
return ((current_time_ms - start_time_ms) % wait_time_ms) / wait_time_ms | |
def update_framebuffer(self, status_page_instance: AbstractStatusPage): | |
status_page_image = status_page_instance.render(self._display_fonts) | |
self._framebuffer_draw.rectangle((0, 0, self._oled_display.width, self._oled_display.height), outline=0, fill=0) | |
self._framebuffer.paste(status_page_image, (0, 0)) | |
def update_display(self): | |
rotated_fb = self._framebuffer.rotate(180) | |
self._oled_display.image(rotated_fb) | |
self._oled_display.show() | |
def main(): | |
disp: Display = Display(wait_time=10) | |
while True: | |
status_pages = disp.instantiate_status_pages() | |
last_index = -1 | |
while True: | |
current_index = disp.get_current_status_page_index() | |
if current_index < last_index: | |
break | |
status_page = status_pages[current_index] | |
disp.update_framebuffer(status_page_instance=status_page) | |
disp.update_display() | |
time.sleep(0.5) | |
last_index = current_index | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment