You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Due to the batching, there is a situation where messages reside inside the converter.
The MQTT protocol does not allow for end 2 end acknowledgments, meaning that once the messages arrive at the converter, they are considered as “delivered”. Therefore, if the converter fails, the messages that were not yet uploaded into the S3 object are going to be lost.
To make sure that delivery is guaranteed, we would need a mechanism that makes sure that the messages are not lost if the converter crashes while waiting for a batch to fill.
One option fo that would be to write every message to persistent media (e.g. disk) as it arrives. If a process restarts, it would read that file and send the data in it.
However, this would have 2 main drawbacks:
there will be a significant performance cost
since mesages are automatically acked when received, everything in th disk buffer or that was not written yet will be lost on crash
For that, we would have a “backup consumer” that listens to the same topic and stores the messages.
There will be an additional control topic between the converted and the backup consumer, on which the converts would send “delete” messages indicating that a message was successfully uploading in a batch to S3.
Periodically, based on some preconfigured timeout, the backup consumer would batch messages that were not deleted and send to S3.
Note that has an "at least once" delivery guarantee, which means that the consumer of the data should be able to handle duplicates.
As demonstrated in AWS IoT and GCP Cloud IoT Core, small and field devices deployed at edge increasingly use message queues for Cloud data ingestion and real time processing.
MQTT is a low overhead protocol geared towards small messages and low resource environments, while S3 is most efficient with large objects.
Solution
So, to efficiently upload MQTT messages to S3, the messages have to be batched together before being uploaded as S3 objects.
Batching should be based on time, number of messages, size, or special application indications sent explicitly from the producer.
The problem with this mechanism is that it creates overhead for the function processing the S3 object, as the original context and separation of the messages is lost as they are batched inside the S3 objects.
The converter would be an MQTT subscriber batching messages in S3 objects and uploading them to Ceph via its object interface
For being “processing friendly” the converter adds a map of offsets as metadata to the uploaded object. Indicating the location of the messages inside the object and timestamp and topic IDs
Converter should be implemented in golang (java is also an option if we run into issues with the different SDKs)
Project
Phase 0
setup environment for development and testing.
Linux
First, would be to have a Linux based development environment, as a minimum you would need an 8 CPU machine, with 16G RAM and 50GB disk.
Note that using a machine with a lower spec is also possible, but Ceph build time might take several hours
Unless you already have a Linux distro you like, I would recommend choosing from:
Fedora - my favorite (34 or higher)
Ubuntu (20.04 and up)
OpenSuse (Leap 15.2 or tumbleweed)
Using WSL on your Windows machine is also possible, but build times would be longer than running native Linux
Git
Once you have that up and running, you should clone the Ceph repo from Github (https://github.com/ceph/ceph).
If you don’t know what Github and git are, this is the right time to close these gaps :-)
And yes, you should have a Github account, so you can later share your work on the project.
Build
The repo has a readme file with instructions on how to build ceph - just follow these instructions and build it (depending on the amount of CPUs you have this may take a while).
Our build system is based on cmake - so it is probably a good idea to know a little bit about that.
Assuming the build was completed successfully, you can run the unit tests (see: https://github.com/ceph/ceph#running-unit-tests).
Assuming you have everything up and running, you can create a bucket in Ceph and upload an object to it.
The best way for doing that is the s3cmd python command-line tool:
https://github.com/s3tools/s3cmd
Note that the tool is mainly geared towards AWS S3, so make sure to specify the location of the RGW as the endpoint, and the RGW credentials (as printed to the screen after running vstart.sh).
In our case we don't need the NoSQL server, and Ceph with an object store frontend (RGW) wil be used as the data lake.
We shoud use the same topic name convention ROOT/FORMAT/COMPRESSED/PLANT-ID/DEVICE-ID/MSG-TYPE/MSG-VERSION. Buckets should include objects holding information per "PLANT-ID".
objects will contain information on multiple devices, and therefore their names should reflect the time range between the first message and the last
Offset mapping key should contain the rest of the identification of the topic together with a timestamp:
DEVICE-ID/MSG-TYPE/MSG-VERSION/Timestamp
For example: WELDING-STATION-12/WELD/1/20181001 171035
Message aggregation should be based on message count (as their sizes are similar). E.g. aggregate 10K messages to one S3 object
Data Consumption
Simple Python Application: write a python app that receives a time range, and a "Plant ID", fetch all relevant S3 objects that belong to that plant and that time range (use boto3), and plot (use pyplot) a time based "voltage" graph for each device
(optional) Use the same application but with grafana as the backend for plotting the data