Last active
October 9, 2023 01:36
-
-
Save reubenmiller/2461cd875765af24333f4165175a6a17 to your computer and use it in GitHub Desktop.
thin-edge.io benchmarking script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""thin-edge.io data simulator""" | |
import argparse | |
import time | |
import random | |
import json | |
from multiprocessing import Pool | |
import datetime | |
import logging | |
import paho.mqtt.client as mqtt | |
LOG = logging.getLogger() | |
LOG.setLevel(logging.INFO) | |
class Pub: | |
def __init__( | |
self, | |
topic_id: str = "/", | |
topic_root: str = "te", | |
host: str = "127.0.0.1", | |
port: int = 1883, | |
qos: int = 0, | |
max_count: int = 5, | |
topic: str = "", | |
timeout: float = 60, | |
wait: float = 0, | |
max_datapoints: int = 3, | |
) -> None: | |
self._topic_root = topic_root | |
self._topic_id = topic_id | |
self.topic = topic | |
self.qos = qos | |
self.max_count = max_count | |
self.host = host | |
self.port = port | |
self.timeout = timeout | |
self.wait = wait | |
self.max_datapoints = max_datapoints | |
self.__c8y_messages = [] | |
self.start_time = None | |
@property | |
def topic_prefix(self) -> str: | |
return "/".join([self._topic_root, self._topic_id]) | |
def get_topic(self, sub_path: str) -> str: | |
sep = "/" | |
return sep.join([self.topic_prefix, sub_path.lstrip(sep)]) | |
def __on_connect(self, client: mqtt.Client, userdata, flags, rc): | |
if rc != 0: | |
raise Exception("Failed to connect to broker") | |
# observe which messages are being sent | |
LOG.info("Subscribing to cloud measurements") | |
client.subscribe( | |
[ | |
("c8y/measurement/measurements/create", 0), | |
] | |
) | |
def __on_message(self, client, userdata, msg: mqtt.MQTTMessage): | |
try: | |
LOG.debug("Received message: %s", msg.payload) | |
payload = json.loads(msg.payload.decode("utf-8")) | |
if msg.topic == "c8y/measurement/measurements/create": | |
if "msgid" in payload: | |
self.__c8y_messages.append(payload) | |
except Exception as ex: | |
LOG.debug("Could not parse payload. %s", ex) | |
def run(self, procID, *args, **kwargs): | |
client = mqtt.Client() | |
client.on_connect = self.__on_connect | |
client.on_message = self.__on_message | |
client.max_inflight_messages_set(self.max_count) | |
client.connect(self.host, self.port) | |
client.loop_start() | |
time.sleep(1) | |
self.start_time = datetime.datetime.utcnow() | |
payload_fixed = { | |
f"data_{i}": round(random.uniform(20, 30), 2) | |
for i in range(self.max_datapoints) | |
} | |
payload_bytes = 0 | |
for i in range(self.max_count): | |
payload = json.dumps( | |
{ | |
"msgid": i, | |
"_generatedAt": datetime.datetime.utcnow().isoformat(), | |
**payload_fixed, | |
} | |
) | |
payload_bytes += len(payload.encode("utf-8")) | |
client.publish(self.get_topic(self.topic), qos=self.qos, payload=payload) | |
if self.start_time: | |
current_time = datetime.datetime.utcnow() | |
curr_delta = current_time - self.start_time | |
if curr_delta.total_seconds() > self.timeout: | |
raise Exception("We hit the pub timeout!") | |
if self.wait: | |
time.sleep(self.wait / 1000) | |
end_time = datetime.datetime.utcnow() | |
delta = end_time - self.start_time | |
idle_sec = self.wait / 1000 * self.max_count | |
# Wait for messages to finish publishing, give max 5 seconds to complete | |
time.sleep(5) | |
LOG.info("Stopping mqtt client") | |
client.disconnect() | |
client.loop_stop() | |
total_cloud_messages = len( | |
{ | |
msg["msgid"]["msgid"]["value"]: msg | |
for msg in self.__c8y_messages | |
if msg.get("msgid", {}).get("msgid", {}).get("value", None) is not None | |
} | |
) | |
dropped_messages = self.max_count - total_cloud_messages | |
return { | |
"worker": procID, | |
"messages": self.max_count, | |
"idle": idle_sec, | |
"total_non_idle": delta.total_seconds() - idle_sec, | |
"total": delta.total_seconds(), | |
"dropped_messages": dropped_messages, | |
"qos": self.qos, | |
"total_payload_bytes": payload_bytes, | |
"total_payload_bytes_per_message": payload_bytes / self.max_count, | |
"ms_per_message": round( | |
(delta.total_seconds() - idle_sec) * 1000 / self.max_count, 3 | |
), | |
} | |
def do_work(opts): | |
return Pub( | |
host=opts.host, | |
port=opts.port, | |
topic_root=opts.topic_root, | |
topic_id=opts.topic_id, | |
topic=opts.topic, | |
max_count=opts.max_count, | |
qos=opts.qos, | |
wait=opts.wait, | |
timeout=opts.pub_timeout, | |
max_datapoints=opts.max_datapoints, | |
).run | |
def main(): | |
parser = argparse.ArgumentParser( | |
prog="simulate", | |
description="thin-edge.io benchmark client", | |
epilog="Benchmark the thin-edge.io application", | |
) | |
parser.add_argument("--host", default="127.0.0.1") | |
parser.add_argument("--port", type=int, default=1883) | |
parser.add_argument( | |
"--mqtt-device-topic-id", dest="topic_id", default="device/main//" | |
) | |
parser.add_argument("--mqtt-topic-root", dest="topic_root", default="te") | |
parser.add_argument("--topic", default="m/environment") | |
parser.add_argument("--pub-timeout", dest="pub_timeout", default=60, type=float) | |
parser.add_argument("--pub-clients", dest="pub_clients", default=1, type=int) | |
parser.add_argument("--max-count", dest="max_count", default=50, type=int) | |
parser.add_argument("--qos", default=0, type=int) | |
parser.add_argument("--max-datapoints", dest="max_datapoints", default=1, type=int) | |
parser.add_argument( | |
"--wait", | |
default=0, | |
type=float, | |
help="Wait in milliseconds before publishing the next message", | |
) | |
parser.add_argument("-v", "--verbose", action="store_true") | |
opts = parser.parse_args() | |
pool = Pool(opts.pub_clients) | |
result = pool.map_async(do_work(opts), range(opts.pub_clients)) | |
pool.close() | |
pool.join() | |
results = result.get() | |
for item in results: | |
print(item) | |
LOG.info("Finished the benchmarking") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment