Skip to content

Instantly share code, notes, and snippets.

@tedwardd
Created November 7, 2024 21:51
Show Gist options
  • Save tedwardd/cb1823d57ca20f705c3b93031d2b45ac to your computer and use it in GitHub Desktop.
Save tedwardd/cb1823d57ca20f705c3b93031d2b45ac to your computer and use it in GitHub Desktop.
mesh-activity-logger
package main
import (
"crypto/tls"
"database/sql"
"flag"
"fmt"
// "log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
_ "github.com/mattn/go-sqlite3"
)
const file string = "topics.db"
const create string = `
CREATE TABLE IF NOT EXISTS topics (
topic TEXT NOT NULL PRIMARY KEY,
count INTEGER NOT NULL
);`
func insertNewTopic(db *sql.DB, topic string) {
_, err := db.Exec("INSERT INTO topics VALUES(?, 1);", topic)
if err != nil {
fmt.Println("1")
panic(err)
}
}
func incrementTopicCount(db *sql.DB, topic string) {
_, err := db.Exec("UPDATE topics SET count = count + 1 WHERE topic = ?;", topic)
if err != nil {
fmt.Println("2")
panic(err)
}
}
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
//fmt.Printf("Received message on topic: %s\nMessage: %s\n\n\n", message.Topic(), message.Payload())
//fmt.Printf("%s\n\n", message.Topic())
db, err := sql.Open("sqlite3", file)
if err != nil {
panic(err)
}
query := `SELECT topic FROM topics WHERE topic = '?'`
topic := message.Topic()
err = db.QueryRow(query, message.Topic()).Scan(&topic)
if err != nil {
if err != sql.ErrNoRows {
fmt.Println("3")
panic(err)
}
insertNewTopic(db, message.Topic())
}
incrementTopicCount(db, message.Topic())
}
func main() {
// MQTT.DEBUG = log.New(os.Stdout, "", 0)
// MQTT.ERROR = log.New(os.Stdout, "", 0)
db, err := sql.Open("sqlite3", file)
if err != nil {
panic(err)
}
if _, err := db.Exec(create); err != nil {
panic(err)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
hostname, _ := os.Hostname()
server := flag.String("server", "tcp://127.0.0.1:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883")
topic := flag.String("topic", "#", "Topic to subscribe to")
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
username := flag.String("username", "", "A username to authenticate to the MQTT server")
password := flag.String("password", "", "Password to match username")
flag.Parse()
connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true)
if *username != "" {
connOpts.SetUsername(*username)
if *password != "" {
connOpts.SetPassword(*password)
} }
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
connOpts.SetTLSConfig(tlsConfig)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to %s\n", *server)
}
<-c
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment