Created
November 7, 2024 21:51
-
-
Save tedwardd/cb1823d57ca20f705c3b93031d2b45ac to your computer and use it in GitHub Desktop.
mesh-activity-logger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/tls" | |
"database/sql" | |
"flag" | |
"fmt" | |
// "log" | |
"os" | |
"os/signal" | |
"strconv" | |
"syscall" | |
"time" | |
MQTT "github.com/eclipse/paho.mqtt.golang" | |
_ "github.com/mattn/go-sqlite3" | |
) | |
const file string = "topics.db" | |
const create string = ` | |
CREATE TABLE IF NOT EXISTS topics ( | |
topic TEXT NOT NULL PRIMARY KEY, | |
count INTEGER NOT NULL | |
);` | |
func insertNewTopic(db *sql.DB, topic string) { | |
_, err := db.Exec("INSERT INTO topics VALUES(?, 1);", topic) | |
if err != nil { | |
fmt.Println("1") | |
panic(err) | |
} | |
} | |
func incrementTopicCount(db *sql.DB, topic string) { | |
_, err := db.Exec("UPDATE topics SET count = count + 1 WHERE topic = ?;", topic) | |
if err != nil { | |
fmt.Println("2") | |
panic(err) | |
} | |
} | |
func onMessageReceived(client MQTT.Client, message MQTT.Message) { | |
//fmt.Printf("Received message on topic: %s\nMessage: %s\n\n\n", message.Topic(), message.Payload()) | |
//fmt.Printf("%s\n\n", message.Topic()) | |
db, err := sql.Open("sqlite3", file) | |
if err != nil { | |
panic(err) | |
} | |
query := `SELECT topic FROM topics WHERE topic = '?'` | |
topic := message.Topic() | |
err = db.QueryRow(query, message.Topic()).Scan(&topic) | |
if err != nil { | |
if err != sql.ErrNoRows { | |
fmt.Println("3") | |
panic(err) | |
} | |
insertNewTopic(db, message.Topic()) | |
} | |
incrementTopicCount(db, message.Topic()) | |
} | |
func main() { | |
// MQTT.DEBUG = log.New(os.Stdout, "", 0) | |
// MQTT.ERROR = log.New(os.Stdout, "", 0) | |
db, err := sql.Open("sqlite3", file) | |
if err != nil { | |
panic(err) | |
} | |
if _, err := db.Exec(create); err != nil { | |
panic(err) | |
} | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt, syscall.SIGTERM) | |
hostname, _ := os.Hostname() | |
server := flag.String("server", "tcp://127.0.0.1:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883") | |
topic := flag.String("topic", "#", "Topic to subscribe to") | |
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at") | |
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection") | |
username := flag.String("username", "", "A username to authenticate to the MQTT server") | |
password := flag.String("password", "", "Password to match username") | |
flag.Parse() | |
connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true) | |
if *username != "" { | |
connOpts.SetUsername(*username) | |
if *password != "" { | |
connOpts.SetPassword(*password) | |
} } | |
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert} | |
connOpts.SetTLSConfig(tlsConfig) | |
connOpts.OnConnect = func(c MQTT.Client) { | |
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil { | |
panic(token.Error()) | |
} | |
} | |
client := MQTT.NewClient(connOpts) | |
if token := client.Connect(); token.Wait() && token.Error() != nil { | |
panic(token.Error()) | |
} else { | |
fmt.Printf("Connected to %s\n", *server) | |
} | |
<-c | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment