Skip to content

Instantly share code, notes, and snippets.

@aria42
Created November 29, 2010 02:15
Show Gist options
  • Select an option

  • Save aria42/719511 to your computer and use it in GitHub Desktop.

Select an option

Save aria42/719511 to your computer and use it in GitHub Desktop.
Proposal FSM for Work
(ns work.fsm
{:doc "Finite State Machine Abstraction for Asynchronous Processing."}
(:use [plumbing.core :only [map-map map-from-vals]]
[work.core :only [queue-work shutdown-now]]
[work.queue :only [ poll offer local-queue]]))
(defrecord ^:private State [id work-fn routing-fn num-threads])
(defn- offer-to-state [state elem]
(offer (:inbox state) elem))
(defn- mk-state-worker
[state-map id]
(let [{:keys [work-fn,routing-fn]} (state-map id)]
(fn [elem _]
(let [res (work-fn elem)
next-id (routing-fn res)
next-state (state-map next-id)]
(offer-to-state next-state res)))))
(defn- launch-state-pools
[state-map]
(map-map
(fn [state]
(when (and state (not= (:id state) :stop))
(queue-work
(mk-state-worker state-map (:id state))
#(poll (:inbox state))
nil
(:num-threads state)
:async
(fn [key e task]
(.printStackTrace e)))))
state-map))
(defn wait-for-complete-results
"Test helper fn waits until the pool finishes processing before returning results."
[response-q expected-seq-size]
(while (< (.size response-q) expected-seq-size)
(Thread/sleep 100))
(sort (iterator-seq (.iterator response-q))))
(defn run-fsm
"run an asynchronous finite-state-machine
get-work: fn to call for task items
states: sequence of state specification. each state spec
is composed of [state-id work-fn routing-fn num-threads]
state-id: unique keyword for state
work-fn: fn applied to each item that hits the state
routing-fn: called on output of work-fn and returns
id of successor state
num-threads: how many threads does the state get to process items
The return is [output-queue state-id-to-pool-map]
where output-queue is the work.queue accumulated items for the
state with the id :stop. state-id-to-pool-map is a map
from state ids to the pools created. You should shutdown these
pools when you've accumulated your data in output queue.
SOME CONVENTIONS: Don't use :start, :stop for state ids. You must
always include a :start state, but do not include a :stop state. Your
routing functions should point to the :stop state though to accumulate
results."
[get-work & state-specs]
(let [state-map (->> state-specs
(map (fn [[id work-fn routing-fn num-threads]]
(State. id work-fn routing-fn num-threads)))
(cons (State. :stop nil nil 0))
(map (fn [s]
(assoc s :inbox
(if (= (:id s) :start)
get-work
(local-queue)))))
(map-from-vals :id))
pool-map (launch-state-pools state-map)]
[(-> state-map :stop :inbox) pool-map]))
(comment
(let [[out pools]
(run-fsm
(local-queue [1 2 3 4])
[:start inc (fn [x] (if (even? x) :even :odd)) 1]
[:even dec (constantly :stop) 1]
[:odd identity (constantly :stop) 1])
flushed-out (wait-for-complete-results out 4)]
(doseq [p (remove nil? (vals pools))]
(shutdown-now p))
flushed-out)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment