Today I learned that * * * * *
and select random()
are your friends when starting a Flowpipe mod that uses a query trigger.
* * * * *
to iterate as fast as possible: every minute
select random()
to ensure there is always fresh data, otherwise nothing will happen
If you're trying to debug something downstream, like sending email, this ensures you'll actually test the downstream thing every time.
Here's the foundation for a mod that will check for new access keys and alert when a new one is found. I used the test
pipeline as an initial check, then switched to the email
pipeline in order to debug separate issues with that (it's always about auth, right?) once I knew the pipeline would receive a message every minute.
When it's running I can see the message in three places: the console (using output "test"
), the email, and also in flowpipe.db, which lives at the mod's root.
trigger_name|primary_key|row_hash|created_at|updated_at
local_trigger_query_aws_iam_access_key|0.9426633829030742|3ea6211b6d8c2ceb097e1ced6f788e8d2e15771dfa7f78a33c48154590ce4c19|2024-02-02T03:12:00.004Z|```
Now that all the moving parts are aligned, I can switch to the real query, like this, that will do nothing until there's a newly-created key.
```select
*
from
aws_iam_access_key
where
create_date > now() - interval '1 day' ```
I always have to remind myself to start with the simplest things and expand incrementally. That's especially important when building/debugging/testing workflows which are notoriously fiddly creatures.
Here's today's little test harness, in case helpful to anyone.
```trigger "query" "aws_iam_access_key" {
connection_string = "postgres://steampipe@localhost:9193/steampipe"
schedule = "* * * * *"
primary_key = "message"
sql = <<EOQ
select random()::text as message
EOQ
capture "insert" {
pipeline = pipeline.email
args = {
message = self.inserted_rows[0].message
}
}
}
pipeline "test" {
param "message" {
type = string
}
output "test" {
value = param.message
}
}
pipeline "email" {
param "message" {
type = string
}
output "test" {
value = param.message
}
step "email" "send_it" {
to = ["[email protected]"]
from = "[email protected]"
smtp_username = "[email protected]"
smtp_password = env("FASTMAIL_PW")
host = "smtp.fastmail.com"
port = 587
subject = "new access key"
content_type = "text/html"
body = <<EOT
"New access key ${param.message}"
EOT
}
}```
See https://flowpipe.io/docs/flowpipe-hcl/trigger/query for more.