Created
January 14, 2012 08:39
-
-
Save biesnecker/1610728 to your computer and use it in GitHub Desktop.
A very simple set of functions create a work queue with an arbitrary number of workers attached in Clojure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns nadsack.utils.queue | |
(:import (java.util.concurrent BlockingQueue LinkedBlockingQueue))) | |
(defn- generic-worker [worker-function #^BlockingQueue queue switch] | |
(future | |
(let [worker-id (.toString (java.util.UUID/randomUUID))] | |
(while (not (realized? switch)) | |
(let [work-item (.take queue)] | |
(cond | |
;; handle a couple of special messages | |
(= work-item :die) nil ; just pass through | |
(= work-item :identity) (println (str "Worker ID: " worker-id)) | |
;; everything else just pass onto the worker-function | |
:default (worker-function work-item) | |
)))))) | |
(defn- kill-worker [#^BlockingQueue queue] (.put queue :die)) | |
;; creates a new queue, spins up max-workers workers, and returns a hashmap with the queue | |
;; and a promise that shuts the queue down when delivered, creates a total of max-worker + 1 threads | |
(defn create | |
"creates a new queue, creates _max-workers_ workers that process jobs using _worker-function_, and returns a hashmap with the queue and a promise that kills all of the workers (once they've cleared the queue of other work) when delivered." | |
[worker-function max-workers] | |
(let [new-queue (LinkedBlockingQueue.) switch (promise) internal-switch (promise)] | |
(future | |
(let [workers (dorun (repeatedly max-workers #(generic-worker worker-function new-queue internal-switch)))] | |
(deref switch) ; block until switch is thrown, then... | |
(while (not (.isEmpty new-queue)) (Thread/sleep 100)) ; block while queue empties | |
(deliver internal-switch true) | |
(dorun (repeatedly (* max-workers 3) #(kill-worker #^BlockingQueue new-queue))) ; kill all of the workers | |
(doseq [w workers] @w) ; wait for all of the workers to stop | |
(.clear #^BlockingQueue new-queue))) ; clear any excess die commands | |
(hash-map :queue new-queue :switch switch))) | |
;; shut the queue down | |
(defn shutdown | |
"terminates all of the workers associated with the passed queue" | |
[queue-map] | |
(deliver (queue-map :switch) true) nil) | |
;; add an item to the queue | |
(defn add | |
"adds _item_ to the queue" | |
[queue-map item] | |
(.put #^BlockingQueue (queue-map :queue) item) nil) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment