Created
November 29, 2010 02:15
-
-
Save aria42/719511 to your computer and use it in GitHub Desktop.
Proposal FSM for Work
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns work.fsm | |
| {:doc "Finite State Machine Abstraction for Asynchronous Processing."} | |
| (:use [plumbing.core :only [map-map map-from-vals]] | |
| [work.core :only [queue-work shutdown-now]] | |
| [work.queue :only [ poll offer local-queue]])) | |
| (defrecord ^:private State [id work-fn routing-fn num-threads]) | |
| (defn- offer-to-state [state elem] | |
| (offer (:inbox state) elem)) | |
| (defn- mk-state-worker | |
| [state-map id] | |
| (let [{:keys [work-fn,routing-fn]} (state-map id)] | |
| (fn [elem _] | |
| (let [res (work-fn elem) | |
| next-id (routing-fn res) | |
| next-state (state-map next-id)] | |
| (offer-to-state next-state res))))) | |
| (defn- launch-state-pools | |
| [state-map] | |
| (map-map | |
| (fn [state] | |
| (when (and state (not= (:id state) :stop)) | |
| (queue-work | |
| (mk-state-worker state-map (:id state)) | |
| #(poll (:inbox state)) | |
| nil | |
| (:num-threads state) | |
| :async | |
| (fn [key e task] | |
| (.printStackTrace e))))) | |
| state-map)) | |
| (defn wait-for-complete-results | |
| "Test helper fn waits until the pool finishes processing before returning results." | |
| [response-q expected-seq-size] | |
| (while (< (.size response-q) expected-seq-size) | |
| (Thread/sleep 100)) | |
| (sort (iterator-seq (.iterator response-q)))) | |
| (defn run-fsm | |
| "run an asynchronous finite-state-machine | |
| get-work: fn to call for task items | |
| states: sequence of state specification. each state spec | |
| is composed of [state-id work-fn routing-fn num-threads] | |
| state-id: unique keyword for state | |
| work-fn: fn applied to each item that hits the state | |
| routing-fn: called on output of work-fn and returns | |
| id of successor state | |
| num-threads: how many threads does the state get to process items | |
| The return is [output-queue state-id-to-pool-map] | |
| where output-queue is the work.queue accumulated items for the | |
| state with the id :stop. state-id-to-pool-map is a map | |
| from state ids to the pools created. You should shutdown these | |
| pools when you've accumulated your data in output queue. | |
| SOME CONVENTIONS: Don't use :start, :stop for state ids. You must | |
| always include a :start state, but do not include a :stop state. Your | |
| routing functions should point to the :stop state though to accumulate | |
| results." | |
| [get-work & state-specs] | |
| (let [state-map (->> state-specs | |
| (map (fn [[id work-fn routing-fn num-threads]] | |
| (State. id work-fn routing-fn num-threads))) | |
| (cons (State. :stop nil nil 0)) | |
| (map (fn [s] | |
| (assoc s :inbox | |
| (if (= (:id s) :start) | |
| get-work | |
| (local-queue))))) | |
| (map-from-vals :id)) | |
| pool-map (launch-state-pools state-map)] | |
| [(-> state-map :stop :inbox) pool-map])) | |
| (comment | |
| (let [[out pools] | |
| (run-fsm | |
| (local-queue [1 2 3 4]) | |
| [:start inc (fn [x] (if (even? x) :even :odd)) 1] | |
| [:even dec (constantly :stop) 1] | |
| [:odd identity (constantly :stop) 1]) | |
| flushed-out (wait-for-complete-results out 4)] | |
| (doseq [p (remove nil? (vals pools))] | |
| (shutdown-now p)) | |
| flushed-out) | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment