-
-
Save DanEdens/08dbb8c7411d3feb4b3b7e600e79f345 to your computer and use it in GitHub Desktop.
Python MQTT Logging Handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import errno | |
import logging | |
import os | |
import re | |
import subprocess | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import List | |
import paho.mqtt.client as mqtt | |
import paho.mqtt.publish as publish | |
# Create utils specific fallback logger for Debugging debug mode | |
logger = logging.getLogger(__name__) | |
fileDate = datetime.now().strftime("%Y-%m-%d") | |
os.environ['ROOT_DIR'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..') | |
def establishBroker(): | |
""" | |
Connect to the MQTT broker for logger mqttHandler stream | |
:return: | |
""" | |
_client = mqtt.Client() | |
_client.connect(host=os.environ.get('AWSIP', 'localhost'), | |
port=int(os.environ.get('AWSPORT', 1884)) | |
) | |
return _client | |
def makeLogger(name: str = __name__, log_to_file: bool = False, | |
log_level: str = os.getenv("project_debug", 'DEBUG')) -> logging.Logger: | |
""" | |
Create the project wide logger. | |
:param name: The name of the logger. | |
:param log_to_file: Whether to log to a file. | |
:param log_level: The log level to use (e.g. 'DEBUG', 'INFO'). | |
:return: A logger object. | |
""" | |
# name = name.replace(".", "/") | |
_format = '%(asctime)s - %(module)s - %(message)s' if log_level == 'DEBUG' else '%(asctime)s - %(message)s' | |
log = logging.getLogger(name) | |
log.setLevel(log_level) | |
if log_to_file: | |
filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}.log" | |
_log = ensureExists(Path(os.environ['ROOT_DIR']).joinpath(f"data//{filename}")) | |
file_handler = logging.FileHandler(_log, mode='a') | |
file_handler.setFormatter(logging.Formatter(_format)) | |
log.addHandler(file_handler) | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(logging.Formatter(_format)) | |
log.addHandler(stream_handler) | |
my_handler = mqttHandler(topic=f'log/{name}') | |
log.addHandler(my_handler) | |
return log | |
def remove_file(files: List[str]) -> bool: | |
""" | |
Removes old copies of files if they exist. | |
:param files: The names of the files to remove. | |
:return: `True` if all files were removed successfully, `False` otherwise. | |
""" | |
success = True | |
for f in files: | |
try: | |
os.remove(f) | |
logger.debug(f'Removing previous copy of {f}..') | |
except OSError: | |
success = False | |
return success | |
def linePrepender(file, line, depth: int = 0, mode: int = 0): | |
""" | |
Prepends given line to given file at depth. | |
:param file: Filepath to write too | |
:param line: str to write | |
:param depth: # of Lines to move away from mode | |
:param mode: 0=Top,1=current,2=Bottom | |
:return: | |
""" | |
with open(file, 'r+') as _file: | |
_file.seek(depth, mode) | |
_file.write(line.rstrip('\r\n') + '\n' + _file.read()) | |
def ensureExists(path): | |
""" | |
Accepts path to file, then creates the directory path if it does not exist | |
:param path: | |
:return: | |
""" | |
if not os.path.exists(os.path.dirname(path)): | |
try: | |
os.makedirs(os.path.dirname(path)) | |
except OSError as exc: # Guard against race condition | |
if exc.errno != errno.EEXIST: | |
raise | |
return path | |
class mqttHandler(logging.Handler): | |
""" | |
A handler class which writes logging records, appropriately formatted, | |
to a MQTT server to a topic. | |
""" | |
def __init__( | |
self, | |
_hostName=os.environ.get('AWSIP', 'localhost'), | |
topic=f'log', | |
qos=1, retain=True, | |
_port=int(os.environ.get('AWSPORT', 1884)), | |
client_id='', | |
keepalive=60, | |
will=None, | |
auth=None, | |
tls=None, | |
protocol=3, | |
transport='tcp', | |
): | |
logging.Handler.__init__(self) | |
self.topic = topic | |
self.qos = qos | |
self.retain = retain | |
self.hostname = _hostName | |
self.port = _port | |
self.client_id = client_id | |
self.keepalive = keepalive | |
self.will = will | |
self.auth = auth | |
self.tls = tls | |
self.protocol = protocol | |
self.transport = transport | |
def emit(self, record: str): | |
""" | |
Publish a single formatted logging record to a broker, | |
then disconnect cleanly | |
:type record: str | |
""" | |
msg = self.format(record) | |
print(f"{self.topic}=:={msg}") | |
publish.single( | |
self.topic, | |
msg, | |
self.qos, | |
self.retain, | |
hostname=self.hostname, | |
port=self.port, | |
client_id=self.client_id, | |
keepalive=self.keepalive, | |
will=self.will, | |
auth=self.auth, | |
tls=self.tls, | |
protocol=self.protocol, | |
transport=self.transport | |
) | |
def post(topic: str, payload: str, retain: bool = False, | |
_client=establishBroker()): | |
""" | |
Post msg to MQTT broker | |
:type _client: object | |
:type topic: str | |
:type payload: str | |
:type retain: bool | |
:param _client: Logging handler. By default, it is created by this module | |
:param retain: Retain topic on broker | |
:param topic: Project name | |
:param payload: Sensor Data | |
""" | |
topic = str(f'project_name/{topic}') | |
payload = str(payload) | |
try: | |
_client.publish(topic=topic, payload=payload, qos=0, retain=retain) | |
except ValueError: | |
logger.debug( | |
f"pub Failed because of wildcard: {str(topic)}=:={str(payload)}") | |
logger.debug(f"Attempting fix...") | |
try: | |
tame_t = topic.replace("+", "_") | |
tame_topic = tame_t.replace("#", "_") | |
tame_p = payload.replace("+", "_") | |
tame_payload = tame_p.replace("#", "_") | |
_client.publish(topic=str(tame_topic), payload=str(tame_payload), | |
qos=1, retain=retain) | |
logger.debug("Fix successful, Sending data...") | |
except Exception as error: | |
logger.info(f"Fix Failed. Bug report sent.") | |
_client.publish("project_name/error", str(error), qos=1, retain=True) | |
def run_command(command: str) -> str: | |
""" | |
Run a command in the shell. | |
:param command: The command to run. | |
:type command: str | |
:return: The output of the command. | |
:rtype: str | |
""" | |
process = subprocess.run(command, shell=True, capture_output=True, | |
text=True) | |
return process.stdout | |
def ping_ip(ip_address="192.168.0.1") -> bool: | |
"""Pings the given IP address until it is successful. | |
:param ip_address: The IP address to ping. | |
:type ip_address: str | |
:returns: bool | |
.. note:: This function uses the `ping` command to ping the given IP address. It will continue to run | |
the `ping` command until it is successful (i.e. the `ping` command returns a return code of 0) | |
""" | |
while True: | |
result = subprocess.run(["ping", "-c", "1", ip_address], | |
stdout=subprocess.PIPE) | |
if result.returncode == 0: | |
# Ping was successful | |
return True | |
else: | |
logger.debug(f"Waiting for DUT to power on: {result.returncode}") | |
time.sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment