Created
May 18, 2012 07:51
-
-
Save fundon/2723828 to your computer and use it in GitHub Desktop.
Queue based consumer/producer.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; This example shows how to create a single producer and multiple consumers. | |
;; Each consumer will receive the message. I'm not sure if internally it's handled in a round-robin manner, | |
;; but when queue does not listen for messages (e.q. it's processing them), it won't receive one. | |
;; | |
;; In order to run it, use: | |
;; | |
;; lein2 run --mode consumer | |
;; | |
;; And in other window: | |
;; | |
;; lein2 run --mode producer | |
;; | |
;; Here are the results that you're likely to get: | |
;; | |
;; [Queue Example] Running consumer 0. Ready to receive messages... | |
;; [Queue Example] Running consumer 1. Ready to receive messages... | |
;; [Queue Example] Running consumer 2. Ready to receive messages... | |
;; [Queue Example] Running consumer 3. Ready to receive messages... | |
;; [Queue Example] Running consumer 4. Ready to receive messages... | |
;; [Queue Example] Consumer 0. Payload: {:message "this is a message number 0"} | |
;; [Queue Example] Consumer 1. Payload: {:message "this is a message number 1"} | |
;; [Queue Example] Consumer 2. Payload: {:message "this is a message number 2"} | |
;; [Queue Example] Consumer 3. Payload: {:message "this is a message number 3"} | |
;; [Queue Example] Consumer 4. Payload: {:message "this is a message number 4"} | |
;; [Queue Example] Consumer 0. Payload: {:message "this is a message number 5"} | |
;; [Queue Example] Consumer 1. Payload: {:message "this is a message number 6"} | |
;; [Queue Example] Consumer 2. Payload: {:message "this is a message number 7"} | |
;; [Queue Example] Consumer 3. Payload: {:message "this is a message number 8"} | |
;; [Queue Example] Consumer 4. Payload: {:message "this is a message number 9"} | |
(ns langohr-examples.core | |
(:gen-class) | |
(:import [com.rabbitmq.client Connection Channel AMQP AMQP$Queue$DeclareOk AMQP$Queue$BindOk AMQP$Queue$UnbindOk] | |
java.io.IOException) | |
(:use clojure.tools.cli | |
[langohr.core :as lhc] | |
[langohr.queue :as lhq] | |
[langohr.basic :as lhb] | |
[langohr.consumers :as lhcons] | |
[clojure.data.json :as json])) | |
(defonce ^Connection conn (lhc/connect)) | |
(defn- deserialize-payload | |
[^bytes payload] | |
(json/read-json (String. ^bytes payload) true)) | |
(defn- serialize-payload | |
[payload-map] | |
(json/json-str payload-map)) | |
(defn run-producer | |
"Producer will publish a message to the queue" | |
[] | |
(let [channel (lhc/create-channel conn) | |
queue (.getQueue (lhq/declare channel "queue-based-example" :exclusive false))] | |
(println "[Queue Example] Running producer..") | |
(dotimes [i 10] | |
(lhb/publish channel "" queue (serialize-payload { :message (format "this is a message number %d" i) })) | |
(Thread/sleep 200)) | |
(println "[Queue Example] Message published. Closing connection") | |
(.close conn))) | |
(defn run-consumer | |
[] | |
(let [channel (lhc/create-channel conn) | |
queue-name "queue-based-example" | |
;; Declare the queue on a consumer, as if producer is ran after consumer, consumer will throw | |
;; channel error: reply-code=404, reply-text=NOT_FOUND - no queue 'queue-based-example' in vhost '/' | |
queue (.getQueue (lhq/declare channel "queue-based-example" :exclusive false))] | |
(dotimes [i 5] | |
(.start (Thread. (fn [] | |
(println (format "[Queue Example] Running consumer %d. Ready to receive messages..." i)) | |
(lhcons/subscribe channel queue-name | |
(fn [delivery message-properties payload] | |
;; (println (format "[Queue Example] Consumer %d. Delivery: %s" i delivery)) | |
;; (println (format "[Queue Example] Consumer %d. Message Properties:" i message-properties)) | |
(println (format "[Queue Example] Consumer %d. Payload: %s" i (deserialize-payload payload)))) :auto-ack true)))) | |
(Thread/sleep 200)))) | |
(defn -main | |
[& args] | |
(let [ [options] (cli args ["--mode" "Example mode, consumer or producer mode" :default "producer"])] | |
(case (:mode options) | |
"producer" (run-producer) | |
"consumer" (run-consumer)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment