|
# Make this explicit, even if it should have been done from the outside. This |
|
# allows the script to run manually if necessary. |
|
package require http |
|
package require json |
|
|
|
# Dash-led options are things that can be set from the outside, e.g. setting the |
|
# environment variable INFLUX_HOST will set the option -host (uppercase |
|
# conversion, removal of leading dash, addition of main name of implementation |
|
# in uppercase, _ as separator). |
|
array set options { |
|
-host "localhost" |
|
-port "8086" |
|
-username "" |
|
-password "" |
|
-db "" |
|
-protocol "http" |
|
-root "" |
|
-field "value" |
|
-mapping {} |
|
-unit "unit" |
|
-senml "" |
|
-tags {} |
|
-truncate -1 |
|
-redirect "@" |
|
-ignore {*null*} |
|
-typeguess on |
|
|
|
obfuscate {"-password"} |
|
id 0 |
|
senMLlog WARN |
|
} |
|
|
|
# Require optional SenML package. |
|
set options(manual) [catch {package require senSML}] |
|
if { $options(manual) } { |
|
debug "Running with restricted senML parser" NOTICE |
|
} else { |
|
debug "Running with RFC8428-compliant senML parser" INFO |
|
} |
|
|
|
# Set options from the environment variables (e.g. content of environment |
|
# variable INFLUX_DESTINATION would change the option -destination). |
|
foreach k [array names options -*] { |
|
set envvar [string toupper [file rootname [file tail [info script]]]]_[string toupper [string trimleft $k -]] |
|
if { [info exists ::env($envvar)] } { |
|
if { $k in $options(obfuscate) } { |
|
debug "Setting option $k to [string repeat "*" [string length [set ::env($envvar)]]] (via environment)" NOTICE |
|
} else { |
|
debug "Setting option $k to [set ::env($envvar)] (via environment)" NOTICE |
|
} |
|
set options($k) [set ::env($envvar)] |
|
} |
|
if { $k in $options(obfuscate) } { |
|
debug "Option $k is [string repeat "*" [string length $options($k)]]" INFO |
|
} else { |
|
debug "Option $k is $options($k)" INFO |
|
} |
|
} |
|
|
|
# Read complex and/or secret options from files whenever appropriate. |
|
foreach opt [list password tags mapping ignore] { |
|
set opt [string trimleft $opt -] |
|
if { [string index $::options(-$opt) 0] eq $::options(-redirect) } { |
|
set fpath [string trim [string range $::options(-$opt) 1 end]] |
|
debug "Reading content of $opt from $fpath" NOTICE |
|
set fd [open $fpath] |
|
set ::options(-$opt) [string trim [read $fd]] |
|
close $fd |
|
} |
|
} |
|
|
|
|
|
# escape -- Influx line format escaping |
|
# |
|
# Perform character escaping according to the Influx line manual on an |
|
# incoming sring and depending on its type: measurement, tag, field, value. |
|
# See: https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/#special-characters-and-keywords |
|
# |
|
# Arguments: |
|
# value String to escape |
|
# type Type of string: measurement, tag, field or value |
|
# |
|
# Results: |
|
# A string where all special characters for that type have been escaped. |
|
# |
|
# Side Effects: |
|
# None. |
|
proc escape { value type } { |
|
set escapes [list] |
|
switch -glob $type { |
|
"m*" { |
|
set escapes [list "," " "] |
|
} |
|
"t*" - |
|
"f*" { |
|
set escapes [list "," "=" " "] |
|
} |
|
"v*" { |
|
set escapes [list "\""] |
|
} |
|
} |
|
|
|
set map [list] |
|
foreach esc $escapes { |
|
lappend map $esc "\\$esc" |
|
} |
|
return [string map $map $value] |
|
} |
|
|
|
|
|
# ingest -- Data ingestion |
|
# |
|
# Receives JSON objects into our internal format from a job queue and |
|
# convert it into calls to the HTTP Influx API. The implementation uses a |
|
# large number of options out of the ::options global array to decide the |
|
# names of the destination tags, etc. |
|
# |
|
# Arguments: |
|
# queue Name of the Disque queue where the job was received. |
|
# jobid Identifier of the job |
|
# json Data for the job. |
|
# |
|
# Results: |
|
# 1 when data could be pushed to the database or when data was not proper, |
|
# 0 otherwise. This arranges for jobs to stay in the queue if connection to |
|
# the database was lost, but to disappear if data is not valid for any |
|
# reason (because it will never be valid, whatever happens!). |
|
# |
|
# Side Effects: |
|
# None. |
|
proc ingest { queue jobid json } { |
|
# Parse incoming JSON and fail early on errors. |
|
if { [catch {::json::json2dict $json} d] } { |
|
debug "Cannot parse incoming JSON:\n$json" WARN |
|
return 1; # We ack the job anyway since we will never be able to parse the data! |
|
} |
|
|
|
# Do some sort of crude validation, verifying at least that we know who this |
|
# is for and that it looks like our internal JSON format. |
|
set fields [list] |
|
foreach {f t} $::options(-mapping) { |
|
lappend fields $f |
|
} |
|
if { $::options(-senml) ne "" } { |
|
lappend fields $::options(-senml) |
|
} |
|
foreach k $fields { |
|
if { ![dict exists $d $k] } { |
|
debug "Cannot find key $k in JSON: $json" WARN |
|
return 1; # We ack the job anyway since we will never be able to parse the data! |
|
} |
|
} |
|
|
|
# Extract tags from the job description that should be mapped onto each |
|
# measurement. |
|
set tags [dict create] |
|
foreach {f t} $::options(-mapping) { |
|
if { $t ne "" } { |
|
dict set tags $t [dict get $d $f] |
|
} |
|
} |
|
|
|
# Convert list of dictionaries in senML format to a list of Influx API |
|
# compatible updates |
|
if { $::options(-senml) eq "" } { |
|
set senml $d |
|
} else { |
|
set senml [dict get $d $::options(-senml)] |
|
} |
|
if { $::options(manual) } { |
|
set influx [senML2influx_manual $senml $tags] |
|
} else { |
|
set influx [senML2influx_rfc $senml $tags] |
|
} |
|
|
|
if { [llength $influx] } { |
|
# Attempt pushing to the Influx server. On failure, we will return a 0. |
|
# When 0 is returned, the job will remain and will be retried a number |
|
# of times, so we have some guarantee of delivery. |
|
debug "Pushing following data to Influx server:\n[join $influx "\n"]" DEBUG |
|
if { [push $influx] eq "" } { |
|
return 0 |
|
} |
|
} |
|
|
|
return 1 |
|
} |
|
|
|
|
|
# push -- Push lines to InfluxDB |
|
# |
|
# Push the list of lines passed as a parameter to the remote InfluxDB as a |
|
# single post. Each line must be in the InfluxDB line protocol format. When |
|
# timestamps are present, they should be expressed in milliseconds. Once |
|
# the connection to the remote InfluxDB has been established, content |
|
# pushing happens in the background. Possible problems will be reported to |
|
# the log. |
|
# |
|
# Arguments: |
|
# lines List of lines in the line protocol format. |
|
# |
|
# Results: |
|
# Token for the HTTP operation, empty string on errors. |
|
# |
|
# Side Effects: |
|
# Connect to remote API as specified through the options and push data in |
|
# the background |
|
proc push { lines } { |
|
# Build URL respecting query string format described at the |
|
# https://docs.influxdata.com/influxdb/v1.5/tools/api/#write |
|
set url "$::options(-protocol)://$::options(-host):$::options(-port)$::options(-root)/write?db=$::options(-db)&u=$::options(-username)&p=$::options(-password)&precision=ms" |
|
debug "Pushing [llength $lines] update(s) to $url" INFO |
|
|
|
# Use internal http package, arrange for TLS support whenever possible. |
|
# This might fail though, since we are in a safe interp. |
|
if { $::options(-protocol) eq "https" } { |
|
package require tls |
|
::http::register https 443 [list ::tls::socket -tls1 1] |
|
} |
|
|
|
# Perform operation in asynchronous mode through the use of a |
|
# command callback for reporting upon success (or failure). |
|
set cmd [list ::http::geturl $url \ |
|
-query [join $lines "\n"] \ |
|
-method POST \ |
|
-binary on \ |
|
-command [list ::Done $url]] |
|
if { [catch $cmd tok] } { |
|
debug "Failed pushing data to $url: $tok" WARN |
|
return "" |
|
} |
|
|
|
return $tok |
|
} |
|
|
|
# senML2influx_manual -- Manual SenML parsing |
|
# |
|
# Convert a list of SenML packs represented as Tcl dictionaries to a list |
|
# of lines ready to be pushed to Influx and compatible with the line format |
|
# protocol. Conversion picks directly from the dictionaries and supposes |
|
# resolved packs, i.e. no support for base fields. |
|
# |
|
# Arguments: |
|
# senml List of Tcl dictionaries, one for each SenML pack |
|
# |
|
# Results: |
|
# Return a list of lines in the Influx line protocol format, conversion is |
|
# subject to rules from the options. |
|
# |
|
# Side Effects: |
|
# All lines matching the -ignore option will be ignored. |
|
proc senML2influx_manual { senml tags } { |
|
set influx {}; # Will contain each update as a line in Influx compatible format |
|
foreach upd $senml { |
|
set line [pack2influx $upd $tags] |
|
if { $line ne "" } { |
|
lappend influx $line |
|
} |
|
} |
|
|
|
return $influx |
|
} |
|
|
|
|
|
# pack2influx -- Convert a resolved SenML Pack |
|
# |
|
# Convert a SenML packs represented as a Tcl dictionary to a line ready to |
|
# be pushed to Influx and compatible with the line format protocol. |
|
# Conversion picks directly from the dictionaries and supposes resolved |
|
# packs, i.e. no support for base fields. |
|
# |
|
# Arguments: |
|
# upd Tcl dictionary representing the SenML pack |
|
# |
|
# Results: |
|
# Return a line in the Influx line protocol format, conversion is subject |
|
# to rules from the options. The line might be empty on errors or when it |
|
# should be ignored (see below) |
|
# |
|
# Side Effects: |
|
# All lines matching the -ignore option will be ignored and lead to |
|
# returning an empty line. |
|
proc pack2influx { upd { tags {}} } { |
|
set line "" |
|
# We need at least a name and some value |
|
if { [dict exists $upd "n"] && \ |
|
( [dict size [dict filter $upd key "v*"]] || \ |
|
[dict exists $upd "s"]) } { |
|
# Construct the line in the line protocol piece by piece while |
|
# respecting the documented escaping rules for the influx line protocol. |
|
# Each line is decomposed in 4 different parts: the name of the |
|
# measurements, a number of tags, the fields being set and, finally, the |
|
# time of the update. |
|
|
|
#### |
|
# PART 1: Measurement |
|
# |
|
# The name of the measurement comes directly from the name in the SenML |
|
# pack. No questions asked! |
|
append line "[escape [dict get $upd "n"] measurement]" |
|
|
|
#### |
|
# PART 2: Tags |
|
|
|
# First add tags that are passed to us, these really have gone through |
|
# the -mapping option in prior calls. |
|
dict for {f t} $tags { |
|
append line ",[escape $f tag]=[escape $t tag]" |
|
} |
|
|
|
# The unit from SenML is pushed as a tag with the name coming from the |
|
# -unit option. When -unit is empty, the unit will not be pushed to |
|
# Influx, even though it could have been present in the SenML pack. |
|
if { [dict exists $upd "u"] && $::options(-unit) ne "" } { |
|
append line ",[escape $::options(-unit) tag]=[escape [dict get $upd u] tag]" |
|
} |
|
|
|
# The module can also add any number of tags and their values to each |
|
# every Influx measurement report. This can be used to tag the source, |
|
# etc. |
|
foreach {k v} $::options(-tags) { |
|
append line ",[escape $k tag]=[escape $v tag]" |
|
} |
|
|
|
#### |
|
# PART 3: Field |
|
# |
|
# The influx protocol is able to associate several fields to a given |
|
# measurement at one time. This implementation only supports one: the |
|
# value from the SenML pack will be set under the name pointed at be the |
|
# -value option. The implementation extracts s or vb or vs or v (in that |
|
# order) from the SenML pack for the value. |
|
append line " " |
|
append line "[escape $::options(-field) field]=" |
|
if { [dict exists $upd "s"] } { |
|
append line [dict get $upd "s"] |
|
} else { |
|
if { [dict exists $upd "vb"] } { |
|
if { [dict get $upd "vb"] } { |
|
append line 1 |
|
} else { |
|
append line 0 |
|
} |
|
} elseif { [dict exists $upd "vs"] } { |
|
append line "\"[escape [dict get $upd vs] value]\"" |
|
} elseif { [dict exists $upd "v"] } { |
|
# Guess around the value, anything that isn't a boolean or number |
|
# will be a string. This is really outside the spec... |
|
set val [dict get $upd "v"] |
|
if { $::options(-typeguess) } { |
|
if { [string is double -strict $val] || [string is boolean -strict $val] } { |
|
append line $val |
|
} else { |
|
append line "\"[escape $val value]\"" |
|
} |
|
} else { |
|
# No guessing, v is reserved for doubles and numbers |
|
append line "$val" |
|
} |
|
} else { |
|
debug "Incoming SenML pack contains an unsupported value type!" WARN |
|
return "" |
|
} |
|
} |
|
|
|
#### |
|
# PART 4: Timestamp |
|
# |
|
# Add the timestamp if we have one (in milliseconds), otherwise let |
|
# be, which will let Influx make that decision. Note that this |
|
# should be in UTC, as per the documentation: |
|
# https://docs.influxdata.com/influxdb/v1.6/concepts/glossary/#timestamp |
|
if { [dict exists $upd "t"] } { |
|
set when [expr {int([dict get $upd "t"]*1000)}] |
|
append line " $when" |
|
} |
|
|
|
#### |
|
# PART 5: Ignore lines |
|
# |
|
# Once a line has been constructed, it can be ignored if it matches any |
|
# of the glob-style patterns passed through the -ignore option. This can |
|
# be used as crude ignore of all lines having null values inside, or for |
|
# skipping sources known to have problems, for example. |
|
foreach ptn $::options(-ignore) { |
|
if { [string match $ptn $line] } { |
|
# do nothing |
|
debug "ignore line $line" DEBUG |
|
return "" |
|
} |
|
} |
|
} |
|
return $line |
|
} |
|
|
|
proc logger { lvl msg } { |
|
debug $msg $lvl |
|
} |
|
|
|
# senML2influx_rfc -- SenML parsing |
|
# |
|
# Convert a list of SenML packs represented as Tcl dictionaries to a list |
|
# of lines ready to be pushed to Influx and compatible with the line format |
|
# protocol. Conversion happens through the SenML packge, meaning that base |
|
# fields will be properly resolved as parsing processes. |
|
# |
|
# Arguments: |
|
# senml List of Tcl dictionaries, one for each SenML pack |
|
# |
|
# Results: |
|
# Return a list of lines in the Influx line protocol format, conversion is |
|
# subject to rules from the options. |
|
# |
|
# Side Effects: |
|
# All lines matching the -ignore option will be ignored. |
|
proc senML2influx_rfc { senml tags } { |
|
# Create a unique global variable to collect data while parsing senML. We |
|
# could use the same global list, but that wouldn't be future-proof in case |
|
# this code would be interrupted through coroutines or similar. |
|
set l ::influx\#[incr ::options(id)] |
|
upvar \#0 $l influx |
|
set influx [list] |
|
|
|
# Create a SenSML context for parsing, arrange to collect in $l via |
|
# senMLcollect procedure. Arrange to pass logging through our own debug |
|
# proc, filtering out to the SenML level from the options (usually around |
|
# WARN, unless problems arise). |
|
set s [senSML \ |
|
-callback [list ::senMLcollect $l $tags] \ |
|
-log @logger \ |
|
-level $::options(senMLlog)] |
|
$s begin |
|
foreach upd $senml { |
|
if { [catch {$s dictpack $upd} err] } { |
|
debug "Could not parse SenML pack $upd: $err" |
|
} |
|
} |
|
$s end |
|
$s delete |
|
|
|
# Transit global variable to a local list so we can unset it and cleanup the |
|
# global state (avoid memory leaks in other words!) |
|
set ret [set $l] |
|
unset $l |
|
return $ret |
|
} |
|
|
|
# senMLcollect -- Collect influx lines during senML parsing |
|
# |
|
# Convert each SenML pack to the influx line protocol and add non empty |
|
# converted lines to the global list variable passed as a parameter. This |
|
# procedure complies to the format required by the SenML parser. |
|
# |
|
# Arguments: |
|
# l Name of global variable where to collect converted packs |
|
# s Identifier of the SenSML parser |
|
# step One of OPEN, CLOSE or PACK. We only process on PACK. |
|
# pack Dictionary representing the resolved SenML pack. |
|
# |
|
# Results: |
|
# None. |
|
# |
|
# Side Effects: |
|
# Empty lines will not be collected. |
|
proc senMLcollect { l tags s step { pack {} } } { |
|
upvar \#0 $l L |
|
|
|
if { $step eq "PACK" } { |
|
set line [pack2influx $pack $tags] |
|
if { $line ne "" } { |
|
lappend L $line |
|
} |
|
} |
|
} |
|
|
|
|
|
# Done -- HTTP cleanup |
|
# |
|
# Cleanup upon HTTP operation termination. |
|
# |
|
# Arguments: |
|
# url URL to Influx server |
|
# tok HTTP token, as of HTTP library |
|
# |
|
# Results: |
|
# None. |
|
# |
|
# Side Effects: |
|
# None. |
|
proc Done { url tok } { |
|
set code [::http::ncode $tok] |
|
if { $code >= 200 && $code < 300 } { |
|
debug "Done with push for $url" DEBUG |
|
} else { |
|
if { $::options(-truncate) > 0 } { |
|
set dta [string range [::http::data $tok] 0 [expr {$::options(-truncate)-1}]] |
|
} else { |
|
set dta [::http::data $tok] |
|
} |
|
|
|
debug "Could not push to Influx at $url:\ |
|
$code -- err:[::http::error $tok]\ |
|
-- status:[::http::status $tok]\ |
|
-- data:[string trim $dta]" WARN |
|
} |
|
::http::cleanup $tok |
|
} |