Skip to content

Instantly share code, notes, and snippets.

@reubenmiller
Last active October 9, 2023 01:36
Show Gist options
  • Save reubenmiller/2461cd875765af24333f4165175a6a17 to your computer and use it in GitHub Desktop.
Save reubenmiller/2461cd875765af24333f4165175a6a17 to your computer and use it in GitHub Desktop.
thin-edge.io benchmarking script
"""thin-edge.io data simulator"""
import argparse
import time
import random
import json
from multiprocessing import Pool
import datetime
import logging
import paho.mqtt.client as mqtt
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
class Pub:
def __init__(
self,
topic_id: str = "/",
topic_root: str = "te",
host: str = "127.0.0.1",
port: int = 1883,
qos: int = 0,
max_count: int = 5,
topic: str = "",
timeout: float = 60,
wait: float = 0,
max_datapoints: int = 3,
) -> None:
self._topic_root = topic_root
self._topic_id = topic_id
self.topic = topic
self.qos = qos
self.max_count = max_count
self.host = host
self.port = port
self.timeout = timeout
self.wait = wait
self.max_datapoints = max_datapoints
self.__c8y_messages = []
self.start_time = None
@property
def topic_prefix(self) -> str:
return "/".join([self._topic_root, self._topic_id])
def get_topic(self, sub_path: str) -> str:
sep = "/"
return sep.join([self.topic_prefix, sub_path.lstrip(sep)])
def __on_connect(self, client: mqtt.Client, userdata, flags, rc):
if rc != 0:
raise Exception("Failed to connect to broker")
# observe which messages are being sent
LOG.info("Subscribing to cloud measurements")
client.subscribe(
[
("c8y/measurement/measurements/create", 0),
]
)
def __on_message(self, client, userdata, msg: mqtt.MQTTMessage):
try:
LOG.debug("Received message: %s", msg.payload)
payload = json.loads(msg.payload.decode("utf-8"))
if msg.topic == "c8y/measurement/measurements/create":
if "msgid" in payload:
self.__c8y_messages.append(payload)
except Exception as ex:
LOG.debug("Could not parse payload. %s", ex)
def run(self, procID, *args, **kwargs):
client = mqtt.Client()
client.on_connect = self.__on_connect
client.on_message = self.__on_message
client.max_inflight_messages_set(self.max_count)
client.connect(self.host, self.port)
client.loop_start()
time.sleep(1)
self.start_time = datetime.datetime.utcnow()
payload_fixed = {
f"data_{i}": round(random.uniform(20, 30), 2)
for i in range(self.max_datapoints)
}
payload_bytes = 0
for i in range(self.max_count):
payload = json.dumps(
{
"msgid": i,
"_generatedAt": datetime.datetime.utcnow().isoformat(),
**payload_fixed,
}
)
payload_bytes += len(payload.encode("utf-8"))
client.publish(self.get_topic(self.topic), qos=self.qos, payload=payload)
if self.start_time:
current_time = datetime.datetime.utcnow()
curr_delta = current_time - self.start_time
if curr_delta.total_seconds() > self.timeout:
raise Exception("We hit the pub timeout!")
if self.wait:
time.sleep(self.wait / 1000)
end_time = datetime.datetime.utcnow()
delta = end_time - self.start_time
idle_sec = self.wait / 1000 * self.max_count
# Wait for messages to finish publishing, give max 5 seconds to complete
time.sleep(5)
LOG.info("Stopping mqtt client")
client.disconnect()
client.loop_stop()
total_cloud_messages = len(
{
msg["msgid"]["msgid"]["value"]: msg
for msg in self.__c8y_messages
if msg.get("msgid", {}).get("msgid", {}).get("value", None) is not None
}
)
dropped_messages = self.max_count - total_cloud_messages
return {
"worker": procID,
"messages": self.max_count,
"idle": idle_sec,
"total_non_idle": delta.total_seconds() - idle_sec,
"total": delta.total_seconds(),
"dropped_messages": dropped_messages,
"qos": self.qos,
"total_payload_bytes": payload_bytes,
"total_payload_bytes_per_message": payload_bytes / self.max_count,
"ms_per_message": round(
(delta.total_seconds() - idle_sec) * 1000 / self.max_count, 3
),
}
def do_work(opts):
return Pub(
host=opts.host,
port=opts.port,
topic_root=opts.topic_root,
topic_id=opts.topic_id,
topic=opts.topic,
max_count=opts.max_count,
qos=opts.qos,
wait=opts.wait,
timeout=opts.pub_timeout,
max_datapoints=opts.max_datapoints,
).run
def main():
parser = argparse.ArgumentParser(
prog="simulate",
description="thin-edge.io benchmark client",
epilog="Benchmark the thin-edge.io application",
)
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument(
"--mqtt-device-topic-id", dest="topic_id", default="device/main//"
)
parser.add_argument("--mqtt-topic-root", dest="topic_root", default="te")
parser.add_argument("--topic", default="m/environment")
parser.add_argument("--pub-timeout", dest="pub_timeout", default=60, type=float)
parser.add_argument("--pub-clients", dest="pub_clients", default=1, type=int)
parser.add_argument("--max-count", dest="max_count", default=50, type=int)
parser.add_argument("--qos", default=0, type=int)
parser.add_argument("--max-datapoints", dest="max_datapoints", default=1, type=int)
parser.add_argument(
"--wait",
default=0,
type=float,
help="Wait in milliseconds before publishing the next message",
)
parser.add_argument("-v", "--verbose", action="store_true")
opts = parser.parse_args()
pool = Pool(opts.pub_clients)
result = pool.map_async(do_work(opts), range(opts.pub_clients))
pool.close()
pool.join()
results = result.get()
for item in results:
print(item)
LOG.info("Finished the benchmarking")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment