Skip to content

Instantly share code, notes, and snippets.

@fundon
Created May 18, 2012 07:51
Show Gist options
  • Save fundon/2723828 to your computer and use it in GitHub Desktop.
Save fundon/2723828 to your computer and use it in GitHub Desktop.
Queue based consumer/producer.
;; This example shows how to create a single producer and multiple consumers.
;; Each consumer will receive the message. I'm not sure if internally it's handled in a round-robin manner,
;; but when queue does not listen for messages (e.q. it's processing them), it won't receive one.
;;
;; In order to run it, use:
;;
;; lein2 run --mode consumer
;;
;; And in other window:
;;
;; lein2 run --mode producer
;;
;; Here are the results that you're likely to get:
;;
;; [Queue Example] Running consumer 0. Ready to receive messages...
;; [Queue Example] Running consumer 1. Ready to receive messages...
;; [Queue Example] Running consumer 2. Ready to receive messages...
;; [Queue Example] Running consumer 3. Ready to receive messages...
;; [Queue Example] Running consumer 4. Ready to receive messages...
;; [Queue Example] Consumer 0. Payload: {:message "this is a message number 0"}
;; [Queue Example] Consumer 1. Payload: {:message "this is a message number 1"}
;; [Queue Example] Consumer 2. Payload: {:message "this is a message number 2"}
;; [Queue Example] Consumer 3. Payload: {:message "this is a message number 3"}
;; [Queue Example] Consumer 4. Payload: {:message "this is a message number 4"}
;; [Queue Example] Consumer 0. Payload: {:message "this is a message number 5"}
;; [Queue Example] Consumer 1. Payload: {:message "this is a message number 6"}
;; [Queue Example] Consumer 2. Payload: {:message "this is a message number 7"}
;; [Queue Example] Consumer 3. Payload: {:message "this is a message number 8"}
;; [Queue Example] Consumer 4. Payload: {:message "this is a message number 9"}
(ns langohr-examples.core
(:gen-class)
(:import [com.rabbitmq.client Connection Channel AMQP AMQP$Queue$DeclareOk AMQP$Queue$BindOk AMQP$Queue$UnbindOk]
java.io.IOException)
(:use clojure.tools.cli
[langohr.core :as lhc]
[langohr.queue :as lhq]
[langohr.basic :as lhb]
[langohr.consumers :as lhcons]
[clojure.data.json :as json]))
(defonce ^Connection conn (lhc/connect))
(defn- deserialize-payload
[^bytes payload]
(json/read-json (String. ^bytes payload) true))
(defn- serialize-payload
[payload-map]
(json/json-str payload-map))
(defn run-producer
"Producer will publish a message to the queue"
[]
(let [channel (lhc/create-channel conn)
queue (.getQueue (lhq/declare channel "queue-based-example" :exclusive false))]
(println "[Queue Example] Running producer..")
(dotimes [i 10]
(lhb/publish channel "" queue (serialize-payload { :message (format "this is a message number %d" i) }))
(Thread/sleep 200))
(println "[Queue Example] Message published. Closing connection")
(.close conn)))
(defn run-consumer
[]
(let [channel (lhc/create-channel conn)
queue-name "queue-based-example"
;; Declare the queue on a consumer, as if producer is ran after consumer, consumer will throw
;; channel error: reply-code=404, reply-text=NOT_FOUND - no queue 'queue-based-example' in vhost '/'
queue (.getQueue (lhq/declare channel "queue-based-example" :exclusive false))]
(dotimes [i 5]
(.start (Thread. (fn []
(println (format "[Queue Example] Running consumer %d. Ready to receive messages..." i))
(lhcons/subscribe channel queue-name
(fn [delivery message-properties payload]
;; (println (format "[Queue Example] Consumer %d. Delivery: %s" i delivery))
;; (println (format "[Queue Example] Consumer %d. Message Properties:" i message-properties))
(println (format "[Queue Example] Consumer %d. Payload: %s" i (deserialize-payload payload)))) :auto-ack true))))
(Thread/sleep 200))))
(defn -main
[& args]
(let [ [options] (cli args ["--mode" "Example mode, consumer or producer mode" :default "producer"])]
(case (:mode options)
"producer" (run-producer)
"consumer" (run-consumer))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment