Skip to content

Instantly share code, notes, and snippets.

@MichaelDrogalis
Created January 8, 2013 04:39
Show Gist options
  • Save MichaelDrogalis/4481202 to your computer and use it in GitHub Desktop.
Save MichaelDrogalis/4481202 to your computer and use it in GitHub Desktop.
(ns topics.core
(:require [clamq.activemq :refer :all]
[clamq.protocol.connection :as connection]
[clamq.protocol.producer :as producer]
[clamq.protocol.consumer :as consumer]))
(def broker (activemq-connection "tcp://localhost:61616"))
(def topic "computational-topic")
(def producer (connection/producer broker {:pubSub true}))
(def results (atom {}))
;;; Use your imagination. :)
(defn heavy-computation-a [results n]
(reset! results (assoc @results :computation-a (inc n))))
(defn heavy-computation-b [results n]
(reset! results (assoc @results :computation-b (dec n))))
(defn heavy-computation-c [results n]
(reset! results (assoc @results :computation-c (* n 2))))
(defn topic-consumer [f]
(connection/consumer broker {:endpoint topic :on-message f :transacted false :pubSub true}))
(def consumer-a (topic-consumer (partial heavy-computation-a results)))
(def consumer-b (topic-consumer (partial heavy-computation-b results)))
(def consumer-c (topic-consumer (partial heavy-computation-c results)))
(consumer/start consumer-a)
(consumer/start consumer-b)
(consumer/start consumer-c)
(defn compute-results []
(producer/publish producer topic 4))
;;; Wait for the consumers to get up. Wouldn't be an issue in a real world example
;;; where consumers are started much further ahead of time.
(Thread/sleep 1000)
(compute-results)
(Thread/sleep 1000)
(println @results) ;;; => {:computation-b 3, :computation-c 8, :computation-a 5}
(.close consumer-c)
(.close consumer-b)
(.close consumer-a)
(.close broker)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment