Created January 14, 2012 08:39
A very simple set of functions create a work queue with an arbitrary number of workers attached in Clojure
(ns nadsack.utils.queue
(:import (java.util.concurrent BlockingQueue LinkedBlockingQueue)))
(defn- generic-worker [worker-function #^BlockingQueue queue switch]
(let [worker-id (.toString (java.util.UUID/randomUUID))]
(while (not (realized? switch))
(let [work-item (.take queue)]
;; handle a couple of special messages
(= work-item :die) nil ; just pass through
(= work-item :identity) (println (str "Worker ID: " worker-id))
;; everything else just pass onto the worker-function
:default (worker-function work-item)
(defn- kill-worker [#^BlockingQueue queue] (.put queue :die))
;; creates a new queue, spins up max-workers workers, and returns a hashmap with the queue
;; and a promise that shuts the queue down when delivered, creates a total of max-worker + 1 threads
(defn create
"creates a new queue, creates _max-workers_ workers that process jobs using _worker-function_, and returns a hashmap with the queue and a promise that kills all of the workers (once they've cleared the queue of other work) when delivered."
[worker-function max-workers]
(let [new-queue (LinkedBlockingQueue.) switch (promise) internal-switch (promise)]
(let [workers (dorun (repeatedly max-workers #(generic-worker worker-function new-queue internal-switch)))]
(deref switch) ; block until switch is thrown, then...
(while (not (.isEmpty new-queue)) (Thread/sleep 100)) ; block while queue empties
(deliver internal-switch true)
(dorun (repeatedly (* max-workers 3) #(kill-worker #^BlockingQueue new-queue))) ; kill all of the workers
(doseq [w workers] @w) ; wait for all of the workers to stop
(.clear #^BlockingQueue new-queue))) ; clear any excess die commands
(hash-map :queue new-queue :switch switch)))
;; shut the queue down
(defn shutdown
"terminates all of the workers associated with the passed queue"
(deliver (queue-map :switch) true) nil)
;; add an item to the queue
(defn add
"adds _item_ to the queue"
[queue-map item]
(.put #^BlockingQueue (queue-map :queue) item) nil)
