Skip to content

Instantly share code, notes, and snippets.

@danielealbano
Last active April 6, 2022 03:05
Show Gist options
  • Save danielealbano/32a391b41cbffb447781989b6ea4b32e to your computer and use it in GitHub Desktop.
Save danielealbano/32a391b41cbffb447781989b6ea4b32e to your computer and use it in GitHub Desktop.
rpi4-ssd1306-status-display.py
# RPI4 SSD1306 Status Display - v0.2 - 2020/12/04 - 11.46 PM GMT
# Requires a 128x32 I2C SSD1306 OLED display
#
# Author Daniele Albano <https://github.com/danielealbano>
#
# It displays a number of status information for the RPI4
# - Memory usage
# - Cpu load (per CPU)
# - Temperature
# - Network traffic (no loopback)
# - Network ip (only ipv4, one screen per interface and ip, no loopback)
# - Hostname
# - Current date and time
#
# Some of these status page are graphical and stacked together (the order is hardcoded but is a simple array) some
# others instead require more space and are displayed one by one.
# It's possible to see an example here
# https://pasteboard.co/JDr0K2t.png
#
#
# To run it
# sudo python3 rpi4-ssd1306-status-display.py
#
# Tested on ubuntu 20.10 64bit
#
# requirements.txt:
# adafruit-circuitpython-ssd1306
#
# ubuntu packages
# python3-pip
# python3-pil
# python3-smbus
# python3-rpi.gpio
# python3-psutil
# i2c-tools
# fonts-freefont-ttf (optional)
#
# ### Changelog
# * 2020/12/04 - 11.46 PM GMT - v0.2
# Refactored AbstractStatusPage to support a generic render method that returns a framebuffer to render instead of using the label/value text combination
# Implemented a AbstractSimpleLabelValueStatusPage to render simple status pages with just a label and a value
# Implemented a AbstractCachedFrameBufferStatusPage to pre-cache a framebuffer and empty it when rendering instead to re-initialize everything everytime
# Implemented a AbstractVerticalBarsWithLabelStatusPage to render set of vertical bars with values between 0 and 1, it also renders a label on the top
# Converted a number of status pages to use AbstractVerticalBarsWithLabelStatusPage
# Implemented the DisksSpaceUsageStatusPage status page (the label is shorten to simply "DU" to reduce space usage)
# Implemented a HorizontalLayoutStatusPage to be able to horizontally stack status pages
# Implemented the decorator status_page to mark a class extending AbstractStatusPage as an actual status page that can be rendered (deprecated, will be removed)
# Hardcoded in the Display class and in the HorizontalLayoutStatusPage the current layout (will be refactored later to take it from command line of configuration)
# Drammatically reduced CPU and Memory usage
#
# * 2020/12/03 11.14 PM GMT - v0.1
# First release (tracked)
#
from abc import ABC, abstractmethod
from PIL import Image, ImageDraw, ImageFont
from typing import List, Optional
import math
import datetime
import sys
import inspect
import time
import psutil
import os
import netifaces as ni
import board
import busio
import adafruit_ssd1306
def status_page(clsobj):
clsobj.__is_status_page__ = True
return clsobj
class NetworkInfoTrait:
@classmethod
def _enumerate_interfaces(cls):
return list(filter(
lambda interface: interface != 'lo' and (ni.AF_INET in ni.ifaddresses(interface)),
ni.interfaces()
))
@classmethod
def _enumerate_interfaces_with_ipv4_addresses(cls):
ifs: List[dict] = []
for interface in ni.interfaces():
iface_addresses = ni.ifaddresses(interface)
if interface == 'lo' or ni.AF_INET not in iface_addresses:
continue
for index, ip_config in enumerate(iface_addresses[ni.AF_INET]):
try:
ifs.append({
"index": index,
"name": interface,
"ip": ip_config['addr']
})
except:
pass
return ifs
@classmethod
def _get_network_stats(cls):
stats_time_ms: float = time.time_ns() // 1000000
# https://stackoverflow.com/questions/1052589/how-can-i-parse-the-output-of-proc-net-dev-into-keyvalue-pairs-per-interface-u
lines = open("/proc/net/dev", "r").readlines()
columns_line = lines[1]
_, receive_columns, transmit_columns = columns_line.split("|")
receive_columns = list(map(lambda a: "rx_" + a, receive_columns.split()))
transmit_columns = list(map(lambda a: "tx_" + a, transmit_columns.split()))
cols = receive_columns + transmit_columns
stats_ifs = {}
for line in lines[2:]:
if line.find(":") < 0:
continue
stats_if, stats_if_data = line.split(":")
stats_if_name = stats_if.strip()
stats_ifs[stats_if_name] = dict(zip(cols, stats_if_data.split()))
# if there is no active link the speed is not provided, otherwise speed in mbit
try:
speed_lines = open("/sys/class/net/{}/speed".format(stats_if_name)).readlines()
speed: float = float(speed_lines[0])
except:
speed: float = 0
stats_ifs[stats_if_name]['speed'] = speed
return stats_ifs, stats_time_ms
def _calculate_rx_tx_kbits(self, stats_if: dict, stats_time_ms) -> dict:
value: dict = dict({
"speed": 0,
"rx": 0,
"tx": 0,
})
# max speed in kbit
value["speed"] = stats_if["speed"] * 1000
if self._last_stats_if is not None:
diff_time_ms_perc = (stats_time_ms - self._last_stats_time_ms) / 1000
# Calculate the Bps and convert them to Kbps by default
diff_rx_bytes = ((int(stats_if['rx_bytes']) - int(self._last_stats_if['rx_bytes'])) * 8) / 1000
diff_tx_bytes = ((int(stats_if['tx_bytes']) - int(self._last_stats_if['tx_bytes'])) * 8) / 1000
for diff_info in [('rx', diff_rx_bytes), ('tx', diff_tx_bytes)]:
diff_bytes = diff_info[1]
diff_bytes /= diff_time_ms_perc
value[diff_info[0]] = diff_bytes
self._last_stats_if = stats_if
self._last_stats_time_ms = stats_time_ms
return value
class DisplayFonts:
def __init__(self, normal: ImageFont, default: ImageFont):
self.normal = normal
self.default = default
class AbstractStatusPage(ABC):
_framebuffer_height = 32
@classmethod
def new(cls):
return cls()
@abstractmethod
def render(self, fonts: DisplayFonts) -> Image:
raise NotImplementedError
class AbstractCachedFrameBufferStatusPage(AbstractStatusPage):
def __init__(self):
super().__init__()
self.__simple_label_value_status_page_cache: Optional[dict] = None
@abstractmethod
def _fb_size(self, fonts: DisplayFonts) -> tuple:
raise NotImplementedError
@abstractmethod
def render_cached_fb(self,
fonts: DisplayFonts,
framebuffer: Image,
framebuffer_size: tuple,
framebuffer_draw: ImageDraw) -> Image:
raise NotImplementedError
def render(self, fonts: DisplayFonts) -> Image:
if self.__simple_label_value_status_page_cache is None:
self.__simple_label_value_status_page_cache = {
'fb_size': self._fb_size(fonts),
'framebuffer': Image.new("1", self._fb_size(fonts)),
}
self.__simple_label_value_status_page_cache['framebuffer_draw'] = \
ImageDraw.Draw(self.__simple_label_value_status_page_cache['framebuffer'])
fb_size: tuple = self.__simple_label_value_status_page_cache['fb_size']
framebuffer: Image = self.__simple_label_value_status_page_cache['framebuffer']
framebuffer_draw: ImageDraw = self.__simple_label_value_status_page_cache['framebuffer_draw']
return self.render_cached_fb(
fonts=fonts,
framebuffer=framebuffer,
framebuffer_size=fb_size,
framebuffer_draw=framebuffer_draw)
class AbstractSimpleLabelValueStatusPage(AbstractCachedFrameBufferStatusPage):
@abstractmethod
def label(self) -> str:
raise NotImplementedError
@abstractmethod
def value(self) -> str:
raise NotImplementedError
def _fb_size(self, fonts: DisplayFonts) -> tuple:
return 128, self._framebuffer_height
def render_cached_fb(self,
fonts: DisplayFonts,
framebuffer: Image,
framebuffer_size: tuple,
framebuffer_draw: ImageDraw) -> Image:
framebuffer_draw.rectangle(
(
0, 0,
framebuffer.width,
framebuffer.height
), outline=0, fill=0)
h_offset: int = 0
for line in [self.label(), self.value()]:
framebuffer_draw.text(
(0, h_offset),
line,
font=fonts.normal,
fill=255)
h_offset += fonts.normal.getsize(line)[1] + 2
return framebuffer
class AbstractVerticalBarsWithLabelStatusPage(AbstractCachedFrameBufferStatusPage):
def __init__(self):
super().__init__()
self._label_text_size: tuple = (0, 0)
@property
@abstractmethod
def _label(self) -> str:
raise NotImplementedError
@property
@abstractmethod
def _bar_values(self) -> List[float]:
raise NotImplementedError
@property
@abstractmethod
def _bar_count(self) -> int:
raise NotImplementedError
@property
def _label_padding_bottom(self) -> int:
return 1
@property
def _bar_width(self) -> int:
return 4
@property
def _bar_padding_right(self) -> int:
return 2
@property
def _bar_outline_size(self) -> int:
return 1
def _fb_size(self, fonts: DisplayFonts) -> tuple:
self._label_text_size = fonts.default.getsize(self._label)
bar_width_with_padding: int = sum([
self._bar_width,
self._bar_outline_size, # left outline width
self._bar_outline_size, # right outline width
self._bar_padding_right
])
bars_total_width: int = (self._bar_count * bar_width_with_padding) - self._bar_padding_right
return max(self._label_text_size[0], bars_total_width), self._framebuffer_height
def render_cached_fb(self,
fonts: DisplayFonts,
framebuffer: Image,
framebuffer_size: tuple,
framebuffer_draw: ImageDraw) -> Image:
outline_size = self._bar_outline_size
outline_size_2 = 2 * outline_size
w_offset: int = outline_size
h_offset: int = self._label_text_size[1] + self._label_padding_bottom
h_available: int = framebuffer_size[1] - 1 - outline_size_2 - h_offset
h_bottom = framebuffer_size[1] - 1 - outline_size
for value in self._bar_values:
bar_height: int = math.ceil(h_available * value)
framebuffer_draw.rectangle(
(
w_offset,
h_offset + outline_size,
w_offset + self._bar_width,
h_bottom
), fill=0, outline=255, width=1)
framebuffer_draw.rectangle(
(
w_offset,
# no outline, decrease of the outline width as the position is calculated from the bottom
h_bottom - bar_height,
w_offset + self._bar_width,
# no outline, decrease of the outline width as the position is calculated from the bottom
h_bottom
), fill=255, outline=None)
w_offset += self._bar_width + self._bar_padding_right + outline_size_2
framebuffer_draw.text(
(0, 0),
self._label,
font=fonts.default,
fill=255)
return framebuffer
@status_page
class DisksSpaceUsageStatusPage(AbstractVerticalBarsWithLabelStatusPage):
_exclude_filesystem_types: List[str] = [
"sysfs", "proc", "devtmpfs", "devpts", "tmpfs", "securityfs", "cgroup", "cgroup2", "autofs", "mqueue", "debugfs",
"tracefs", "fusectl", "squashfs", "nsfs", "pstore", "configfs", "bpf"
]
_exclude_loop_devs = True
def __init__(self):
super().__init__()
self._mountpoints: List[dict] = None
def _enumerate_mountpoints(self) -> List[dict]:
mountpoints = open("/proc/mounts", "r").readlines()
mountpoints = map(lambda x: x.split(), mountpoints)
mountpoints = map(lambda x: {
"dev": x[0],
"path": x[1],
"fstype": x[2],
"options": x[3],
"backup": x[4],
"check_order": x[5]
}, mountpoints)
return list(mountpoints)
def _filter_mountpoints(self, mountpoints: List[dict]) -> List[dict]:
mountpoints = filter(lambda x: x['fstype'] not in self._exclude_filesystem_types, mountpoints)
if self._exclude_loop_devs:
mountpoints = filter(lambda x: not x['dev'].startswith('/dev/loop'), mountpoints)
return list(mountpoints)
def _get_mountpoints_stats(self, mountpoints: List[dict]):
for mountpoint in mountpoints:
mountpoint_path_statvfs = os.statvfs(mountpoint['path'])
mountpoint["free"] = float(mountpoint_path_statvfs.f_frsize * mountpoint_path_statvfs.f_bavail)
mountpoint["total"] = float(mountpoint_path_statvfs.f_frsize * mountpoint_path_statvfs.f_blocks)
return mountpoints
@property
def _label(self) -> str:
return "DU"
@property
def _bar_values(self) -> List[float]:
mountpoints: List[dict] = self._get_mountpoints_stats(mountpoints=self._mountpoints)
return list(map(lambda x: 1 - (x["free"] / x["total"]), mountpoints))
@property
def _bar_count(self) -> int:
self._mountpoints = self._filter_mountpoints(mountpoints=self._enumerate_mountpoints())
return len(self._mountpoints)
@status_page
class CpuLoadStatusPage(AbstractVerticalBarsWithLabelStatusPage):
def __init__(self):
super().__init__()
self._cpu_count = psutil.cpu_count()
@property
def _label(self) -> str:
return "CPU"
@property
def _bar_values(self) -> List[float]:
return list(map(lambda x: x / 100, psutil.cpu_percent(interval=0.5, percpu=True)))
@property
def _bar_count(self) -> int:
return self._cpu_count
@status_page
class MemoryUsageStatusPage(AbstractVerticalBarsWithLabelStatusPage):
@property
def _label(self) -> str:
return "MEM"
@property
def _bar_values(self) -> List[float]:
return [
psutil.virtual_memory().percent / 100,
psutil.swap_memory().percent / 100
]
@property
def _bar_count(self) -> int:
return 2
@status_page
class TemperatureStatusPage(AbstractVerticalBarsWithLabelStatusPage):
@property
def _label(self) -> str:
return "C°"
@property
def _bar_values(self) -> List[float]:
temperature: float = 0
temperature_str: str = ""
with open("/sys/class/thermal/thermal_zone0/temp") as f:
temperature_str = f.read()
if temperature_str:
temperature: float = float(temperature_str) / 1000
# Display max 90 C degrees, because the PI will never be below 40 celsius degree, also offset the temperature
# to improve how the range is displayed
return [
(temperature - 40) / 50
]
@property
def _bar_count(self) -> int:
return 1
@status_page
class NetworkTrafficStatusPage(AbstractVerticalBarsWithLabelStatusPage, NetworkInfoTrait):
@classmethod
def new(cls):
return [
cls(interface)
for interface in cls._enumerate_interfaces()
]
def __init__(self, interface: str):
super().__init__()
self._interface = interface
self._last_stats_if: Optional[dict] = None
self._last_stats_time_ms: int = 0
@property
def _label(self) -> str:
return self._interface
@property
def _bar_values(self) -> List[float]:
stats_ifs, stats_time_ms = self._get_network_stats()
stats_if = stats_ifs[self._interface]
rx_tx_kbits = self._calculate_rx_tx_kbits(stats_if, stats_time_ms)
# No reason to report anything if speed is 0 (aka no link)
if rx_tx_kbits['speed'] == 0:
return [0, 0]
return [
rx_tx_kbits['rx'] / rx_tx_kbits['speed'],
rx_tx_kbits['tx'] / rx_tx_kbits['speed']
]
@property
def _bar_count(self) -> int:
return 2
@status_page
class NetworkIpStatusPage(AbstractSimpleLabelValueStatusPage, NetworkInfoTrait):
@classmethod
def new(cls):
return [
cls(interface["name"], interface["index"], interface["ip"])
for interface in cls._enumerate_interfaces_with_ipv4_addresses()
]
def __init__(self, interface: str, index: int, ip_address: str):
super().__init__()
self._if_name = interface
self._if_index = index
self._if_ipv4_addr = ip_address
def label(self) -> str:
if self._if_index == 0:
return "IP IPV4 ({})".format(self._if_name)
else:
return "IP IPV4 ({}:{})".format(self._if_name, self._if_index)
def value(self) -> str:
return self._if_ipv4_addr
@status_page
class HostnameIpStatusPage(AbstractSimpleLabelValueStatusPage):
def label(self) -> str:
return "HOSTNAME"
def value(self) -> str:
hostname: str = ""
with open("/etc/hostname") as f:
hostname = f.read()
if hostname:
return hostname
return "UNKNOWN"
@status_page
class DateTimeStatusPage(AbstractSimpleLabelValueStatusPage):
def label(self) -> str:
return "Date/Time"
def value(self):
return datetime.datetime.now().strftime("%x %X")
class HorizontalLayoutStatusPage(AbstractStatusPage):
padding_right: int = 6
def __init__(self, status_pages_cls: list = []):
super().__init__()
self._fb_cache = None
self._status_pages_instances: List[AbstractStatusPage] = []
status_page_classes: list = [
CpuLoadStatusPage,
TemperatureStatusPage,
MemoryUsageStatusPage,
DisksSpaceUsageStatusPage,
NetworkTrafficStatusPage
]
status_page_instances: List[AbstractStatusPage] = []
for status_page_class in status_page_classes:
new_instance = status_page_class.new()
if isinstance(new_instance, list):
status_page_instances.extend(new_instance)
else:
status_page_instances.append(new_instance)
self._status_pages_instances = status_page_instances
def render(self, fonts: DisplayFonts) -> Image:
status_pages_instances_fbs: List[Image] = [
x.render(fonts)
for x in self._status_pages_instances
]
framebuffer_size: tuple = (
sum([x.width + self.padding_right for x in status_pages_instances_fbs]) - self.padding_right,
max([x.height for x in status_pages_instances_fbs])
)
framebuffer = Image.new("1", framebuffer_size)
w_offset = 0
for status_pages_instance_fb in status_pages_instances_fbs:
framebuffer.paste(status_pages_instance_fb, (w_offset, 0))
w_offset += status_pages_instance_fb.width + self.padding_right
return framebuffer
class Display:
def __init__(self, wait_time: int = 5):
self._wait_time: int = wait_time
self._oled_display: adafruit_ssd1306.SSD1306_I2C = None
self._framebuffer: Image = None
self._framebuffer_draw: ImageDraw = None
self._status_page_instantiate_first_time: bool = True
self._status_page_instances: List[AbstractStatusPage] = None
self._status_page_instantiate_time: float = 0
self._init_fonts()
self._init_display()
self._init_framebuffer(width=self._oled_display.width, height=self._oled_display.height)
def _init_fonts(self):
font_normal: ImageFont = None
font_default: ImageFont = ImageFont.load_default()
try:
font_normal = ImageFont.truetype("FreeMonoBold", 12)
except Exception:
font_normal = ImageFont.load_default()
self._display_fonts = DisplayFonts(
normal=font_normal,
default=font_default
)
def _init_display(self):
i2c: busio.I2C = busio.I2C(board.SCL, board.SDA)
oled: adafruit_ssd1306.SSD1306_I2C = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c, addr=0x3c)
oled.fill(0)
oled.show()
self._oled_display = oled
def _init_framebuffer(self, width: int, height: int):
self._framebuffer = Image.new("1", (width, height))
self._framebuffer_draw = ImageDraw.Draw(self._framebuffer)
def instantiate_status_pages(self) -> List[AbstractStatusPage]:
status_page_classes: list = [
HorizontalLayoutStatusPage,
NetworkIpStatusPage,
HostnameIpStatusPage,
DateTimeStatusPage
]
status_page_instances: List[AbstractStatusPage] = []
for status_page_class in status_page_classes:
new_instance = status_page_class.new()
if isinstance(new_instance, list):
status_page_instances.extend(new_instance)
else:
status_page_instances.append(new_instance)
self._status_page_instances: List[AbstractStatusPage] = status_page_instances
self._status_page_instantiate_time: float = time.time()
return self._status_page_instances
def get_current_status_page_index(self) -> int:
return int((time.time() - self._status_page_instantiate_time) / self._wait_time) % (len(self._status_page_instances))
def _get_remaining_wait_time_current_status_page(self) -> float:
wait_time_ms: float = float(self._wait_time * 1000)
start_time_ms: float = self._status_page_instantiate_time * 1000
current_time_ms: float = time.time() * 1000
return ((current_time_ms - start_time_ms) % wait_time_ms) / wait_time_ms
def update_framebuffer(self, status_page_instance: AbstractStatusPage):
status_page_image = status_page_instance.render(self._display_fonts)
self._framebuffer_draw.rectangle((0, 0, self._oled_display.width, self._oled_display.height), outline=0, fill=0)
self._framebuffer.paste(status_page_image, (0, 0))
def update_display(self):
rotated_fb = self._framebuffer.rotate(180)
self._oled_display.image(rotated_fb)
self._oled_display.show()
def main():
disp: Display = Display(wait_time=10)
while True:
status_pages = disp.instantiate_status_pages()
last_index = -1
while True:
current_index = disp.get_current_status_page_index()
if current_index < last_index:
break
status_page = status_pages[current_index]
disp.update_framebuffer(status_page_instance=status_page)
disp.update_display()
time.sleep(0.5)
last_index = current_index
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment