Last active
December 12, 2017 07:13
-
-
Save malcolmsparks/6044878 to your computer and use it in GitHub Desktop.
MQTT example for Clojure core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns mqtt-insertion.core | |
(:require [clojure.core.async :refer :all]) | |
(:import (org.eclipse.paho.client.mqttv3 | |
MqttCallback | |
MqttAsyncClient | |
MqttConnectOptions | |
MqttDeliveryToken | |
MqttException | |
MqttMessage | |
MqttTopic | |
) | |
(org.eclipse.paho.client.mqttv3.persist | |
MqttDefaultFilePersistence))) | |
(def tok->chan (atom {})) | |
(defn- mqtt-callback | |
"Function called after delivery confirmation" | |
[] | |
(reify MqttCallback | |
(connectionLost [_ cause] | |
nil) | |
(messageArrived [_ topic message] | |
nil) | |
(deliveryComplete [_ tok] | |
(when-let [c (@tok->chan tok)] | |
(go (>! c :arrived)) | |
(swap! tok->chan dissoc tok))))) | |
(defn mqtt-connect | |
"Returns a MqttClient." | |
[broker-url client-id persistence] | |
(doto (MqttAsyncClient. broker-url client-id persistence) | |
(.setCallback (mqtt-callback)) | |
(.connect (doto (MqttConnectOptions.) | |
(.setCleanSession true) | |
(.setKeepAliveInterval 30))))) | |
(defn- mqtt-create-message | |
"Creates a MQTT message." | |
([{:keys [qos retained] :or {qos 0 retained false}} ^String message] | |
(doto (MqttMessage. (.getBytes message)) | |
(.setQos qos) | |
(.setRetained retained))) | |
([^String message] | |
(mqtt-create-message {} message))) | |
(defn- mqtt-publish | |
"Publishes MESSAGE to TOPIC" | |
[client topic message] | |
(.publish client topic message)) | |
(defn test-me [] | |
(let [persistence (new MqttDefaultFilePersistence) | |
client (mqtt-connect "tcp://localhost:1883" "emacs" persistence)] | |
(Thread/sleep 400) | |
(try | |
(dotimes [n 10] | |
(go | |
(loop [] | |
(let [c (chan) | |
tok (mqtt-publish client "mousetrap" (mqtt-create-message (str "sprung-" n)))] | |
(swap! tok->chan assoc tok c) | |
(when-not (alts! c (timeout 500)) | |
(swap! tok->chan dissoc tok) | |
(recur)) | |
)))) | |
(finally | |
(Thread/sleep 400) | |
(.disconnect client) | |
(.close client) | |
(.close persistence))))) | |
;; (test-me) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the example @malcolmsparks