Skip to content

Instantly share code, notes, and snippets.

@efrecon
Last active February 1, 2023 22:44
Show Gist options
  • Save efrecon/eb47a7625a39b4b09c7f8dd3081c633d to your computer and use it in GitHub Desktop.
Save efrecon/eb47a7625a39b4b09c7f8dd3081c633d to your computer and use it in GitHub Desktop.
Automatically chops SenML data and send it in chunks to remote MQTT topics

Chopper

The prupose of this module is to help chopping SenML into smaller chunks and to automatically send chunks away via MQTT as soon as a number of entries have been collected (the chopping size). Data is collected per MQTT topic to send to. The module is tuned to be used from inside logic behind projects such as http2mqtt or mqtt2any.

Configuration

Configuration happens through calling the chopper configure with one of the following options.

  • -header should contain a header that is automatically added at the beginning of each chunk. When empty, no header will be added. The default is the beginning of a JSON array, i.e. [.
  • -footer should contain a footer that is automatically added at the end of each chunk before it is sent away. When empty, no footer will be added. The default is the end of a JSON array, i.e. ].
  • -trimright should contain one or several characters that will be trimmed away from the right of the chunk before the footer is added. This defaults is a ,, which allows the programmer to slopily add JSON lines, all terminated with a comma, a comma that will automatically be removed.
  • -chunk is the number of items to have in each chunk, a negative number will keep all items in a single chunk.
  • -pulse is the time (in milliseconds) to wait between sending (series of) chunks. When negative or zero, all chunks are sent at once.
  • -throttle contains the number of chunks to send in one go before waiting for -pulse milliseconds again.
  • -defaults should contain a 3-ary list where the first element is a glob-style pattern to match against MQTT topics, and the two following items are the QoS and value of the retain flag to use. These defaults are used when chunks are constructed without specific QoS and/or retain flag information.
  • -debug is the command to call to write out debug information, it will be called with two additional arguments, i.e. the message to print out and the level, i.e. one of WARN, NOTICE, INFO, DEBUG, TRACE. When empty, the default, no debug output will occur.
  • -send is the command to call to send chopped content to the remote. The command will be called with exactly four more arguments, i.e., in order, the MQTT topic to send to, the data to be sent, the QoS and the value of the retain flag. When empty, the default, nothing will be sent.

API

You can at any time start the construction (and chopping) of a message through calling chopper append. You can issue as many calls to the procedure as necessary to create a JSON object. Once the JSON object is done, you should call the chopper close (as in closing the object), possibly with some QoS and retain information. When no information is provided, the defaults from -defaults will be looked for. The module might decide to automatically send data (via the command specified at -send) whenever necessary. Once you are sure that you have no more JSON data objects to add to the array, you should call the chopper send command, once again possibly with QoS and retain information (see above).

An example of this flow can be found at the end of the implementation as a crude form of testing and demo.

# Accumulates and chops data, for MQTT sending through mqtt command. The module
# is generic, but tuned for SenML serialisation.
namespace eval ::chopper {
namespace eval vars {
variable -header "\["
variable -footer "\]"
variable -trimright ","
variable -chunk 100
variable -pulse 100
variable -throttle 5
variable -defaults {* 1 0}
variable -send {}
variable -debug {}
variable -charset "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789/-_.+\#\$"
variable -asciify {ä a å a ö o â a ä a à a é e è e ê e ë e î i ï i ô o ö o û u ü u}
}
namespace eval topics {}
namespace export {[a-z]*}
namespace ensemble create
}
proc ::chopper::configure { args } {
if { [llength $args] == 0 } {
set cfg [dict create]
foreach v [info vars vars::-*] {
dict set cfg [lindex [split $v ":"] end] [set $v]
}
return $cfg
} elseif { [llength $args] == 1 } {
set v -[string trimleft [lindex $args 0] -]
if { [info exists vars::$v] } {
return [set vars::$v]
} else {
return ""
}
} else {
foreach {k v} $args {
set k -[string trimleft $k -]
if { [info exists vars::$k] } {
set vars::$k $v
} else {
return -code error "$k is an unknown configuration option, should be one of [join [dict keys [configure]] , ]"
}
}
}
}
proc ::chopper::topic { topic { replace "_" } } {
set topic [string map -nocase ${vars::-asciify} $topic]
set cleanstr ""
foreach c [split $topic ""] {
if { [string first $c ${vars::-charset}] < 0 } {
::append cleanstr $replace
} else {
::append cleanstr $c
}
}
return $cleanstr
}
proc ::chopper::append { topic args } {
set vname [namespace current]::topics::$topic
if { ![info exists $vname] } {
upvar \#0 $vname STATE
set STATE(chunk) ${vars::-header}
set STATE(timer) ""
set STATE(queue) [list]
set STATE(dumped) 0
}
upvar \#0 $vname STATE
::append STATE(chunk) {*}$args
}
proc ::chopper::close { topic { qos -1 } { retain "" } } {
# Guess/take QoS and retain flag
set qos [QoS $topic $qos]
set retain [Retain $topic $retain]
set vname [namespace current]::topics::$topic
if { [info exists $vname] } {
upvar \#0 $vname STATE
incr STATE(dumped)
if { ${vars::-chunk} > 0 && ($STATE(dumped) % ${vars::-chunk}) == 0 } {
send $topic $qos $retain; # Will close chunk
}
}
}
proc ::chopper::send { topic { qos -1 } { retain "" } } {
# Guess/take QoS and retain flag
set qos [QoS $topic $qos]
set retain [Retain $topic $retain]
set vname [namespace current]::topics::$topic
if { [info exists $vname] } {
upvar \#0 $vname STATE
# Finalise chunk content
set STATE(chunk) [string trimright $STATE(chunk) ${vars::-trimright}]
::append STATE(chunk) ${vars::-footer}
if { [string trim $STATE(chunk) "${vars::-header}${vars::-footer}"] ne "" } {
if { ${vars::-pulse} < 0 } {
Out $topic $STATE(chunk) $qos $retain
} else {
Debug "Enqueuing data to $topic" DEBUG
lappend STATE(queue) $STATE(chunk) $qos $retain
if { $STATE(timer) eq "" } {
Debug "Starting flushing timer ASAP" DEBUG
set STATE(timer) [after idle [list [namespace current]::Flush $topic]]
}
}
}
# Prepare new chunk
set STATE(chunk) ${vars::-header}
}
}
proc ::chopper::Out { topic chunk qos retain } {
# mqtt $topic $chunk -qos $qos -retain $retain
if { [llength ${vars::-send}] } {
eval [linsert ${vars::-send} end $topic $chunk $qos $retain]
}
Debug "Sent $chunk to $topic" TRACE
}
proc ::chopper::Debug { msg lvl } {
if { [llength ${vars::-debug}] } {
eval [linsert ${vars::-debug} end $msg $lvl]
}
}
proc ::chopper::Flush {topic} {
set vname [namespace current]::topics::$topic
if { [info exists $vname] } {
upvar \#0 $vname STATE
set STATE(timer) ""
set chunks 0
for {set i 0} {$i<${vars::-throttle}} {incr i} {
if { [llength $STATE(queue)] > 0 } {
Debug "[expr {[llength $STATE(queue)]/3}] chunk(s) waiting in queue" DEBUG
lassign $STATE(queue) chunk qos retain
Out $topic $chunk $qos $retain
incr chunks
set STATE(queue) [lrange $STATE(queue) 3 end]
}
}
Debug "Sent $chunks chunk(s) to MQTT server" INFO
if { [llength $STATE(queue)] > 0 } {
Debug "Flushing more in ${vars::-pulse} ms" DEBUG
set STATE(timer) [after ${vars::-pulse} [list [namespace current]::Flush $topic]]
}
}
}
proc ::chopper::QoS { topic { qos -1 } } {
# Find matching in defaults
if { $qos eq "" || $qos < 0 } {
foreach {ptn q r } ${vars::-defaults} {
if { [string match $ptn $topic] } {
Debug "Setting QoS to $q from defaults for $topic" DEBUG
return $q
}
}
# Default to something with a warning if nothing found
Debug "QoS cannot be negative or empty defaulting to 1!" WARN
return 1
}
# Return incoming
return $qos
}
proc ::chopper::Retain { topic { retain "" } } {
# Find matching in defaults
if { $retain eq "" || ![string is boolean -strict $retain] } {
foreach {ptn q r } ${vars::-defaults} {
if { [string match $ptn $topic] } {
Debug "Setting retain flag to $r from defaults for $topic" DEBUG
return $r
}
}
# Default to something with a warning if nothing found
Debug "Retain should be a flag defaulting to 0!" WARN
return 0
}
# Return incoming
return $retain
}
if { [info exists argv0] && [file rootname $argv0] eq [file rootname [info script]] } {
# Print out what would have been sent
proc ::Printer { topic chunk qos retain } {
puts "Sending to $topic (QoS: $qos, Retain: $retain):\n$chunk"
}
# Configure, note that we set some specific defaults for the topic ending
# with 5 to demonstrate QoS and retain picking.
chopper configure \
-send ::Printer \
-throttle 3 \
-pulse 100 \
-chunk 5 \
-defaults {*5 2 1 * 1 0}
for {set i 0} {$i<10} {incr i} {
set topic [chopper topic åöä|/topic$i]
for {set j 0} {$j<300} {incr j} {
chopper append $topic "{"
chopper append $topic "\"n\":\"" var$j "\"" ","
chopper append $topic "\"v\": " $j
chopper append $topic "},"
chopper close $topic
}
chopper send $topic
}
vwait forever
}
package provide chopper 1.0
BSD 3-Clause License
Copyright (c) 2019, Emmanuel Frecon <[email protected]>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment