Created
December 13, 2010 17:21
-
-
Save aria42/739258 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns work.graph | |
| (:require | |
| [clojure.contrib.logging :as log] | |
| [clojure.zip :as zip] | |
| [work.core :as work] | |
| [work.queue :as workq])) | |
| (defn table | |
| "takes kv pairs. | |
| keys can be vectors of keys, matching a fn. | |
| values can be vectors of fns to apply in sequential order." | |
| [& pairs] | |
| (apply hash-map (flatten (map (fn [[k v]] | |
| (let [f (if (vector? v) | |
| (apply juxt v) | |
| v)] | |
| (if (vector? k) | |
| (map (fn [ki] [ki f]) k) | |
| [k f]))) | |
| (partition 2 2 pairs))))) | |
| (defn dispatch | |
| "takes a dispatch fn and dispatch table(s). | |
| returns a fn taking args, dispatching on args, and applying dispatch fn to args." | |
| [d & tables] | |
| (let [table (apply merge tables)] | |
| (fn [& args] | |
| (let [f (table (apply d args))] | |
| (when-not f | |
| (log/error (format "unable to dispatch on %s." (pr-str args)))) | |
| (apply f args))))) | |
| ;; New Graph Stuff | |
| ; A vertex is an inbox-queue, process-policy and outbox-policy | |
| (defrecord Vertex [f inbox-queue process-policy outbox-policy]) | |
| ;; Vertex Inbox | |
| (defprotocol VertexOutboxPolicy | |
| (broadcast [this x] | |
| "broadcast output x to neighbors or for side-effect") | |
| (add-listener [this listener] | |
| "return new VertexOutbox Policy with another listener. | |
| OPTIONAL.")) | |
| (defprotocol VertexProcessPolicy | |
| (start-pool [this f inbox-queue outbox-policy] | |
| "return pool to process f | |
| with broadcast policy. doesn't need to spawn | |
| a new pool, just return a handle to one")) | |
| ;; Broadcast Policy | |
| ;; Either use expandable adjacency list or jump via dispatch | |
| ; outs is a seq of vertices | |
| (defrecord VertexBroadcastAllPolicy [outs] | |
| VertexOutboxPolicy | |
| (broadcast [this x] (doseq [o outs] (workq/offer (:inbox-queue o) x))) | |
| (add-listener [this listener] (update-in this [:outs] conj listener))) | |
| (defrecord VertexNoOpOutboxPolicy [] | |
| (broadcast [this x])) | |
| ; dispatch fn takes x -> vertex id | |
| ; graph vertex id -> vertex | |
| ; before running graph, each VertexDispatchPolicy | |
| ; will have current graph | |
| (defrecord VertexDispatchPolicy [dispatch graph] | |
| VertexOutboxPolicy | |
| (broadcast [this x] | |
| (let [dispatch-id (dispatch x) | |
| out-vertex (graph dispatch-id)] | |
| (workq/offer (:inbox-queue out-vertex) x)))) | |
| ;; Process Policy | |
| ;; Queue-Work | |
| (defrecord QueueWorkProcessPolicy [threads sleep-time exec] | |
| VertexProcessPolicy | |
| (start-pool [this f inbox-queue outbox-policy] | |
| (future (work/queue-work | |
| (merge this | |
| { :f f | |
| :in (partial workq/poll inbox-queue) | |
| :out (partial broadcast outbox-policy)}))))) | |
| ;; Vertex process policy for an "input" vertex | |
| ;; where we inject data | |
| (defrecord SourceProcessPolicy [freq] | |
| VertexProcessPolicy | |
| (start-pool [this f _ outbox-policy] | |
| (future (work/schedule-work | |
| freq | |
| #(broadcast outbox-policy (f)))))) | |
| (defn- run-vertex | |
| "launch vertex return vertex with :pool field" | |
| [{:keys [f,inbox-queue,outbox-policy] :as vertex}] | |
| (assoc vertex | |
| :pool (start-pool vertex f inbox-queue outbox-policy))) | |
| ;; New Graph Stuff | |
| ; Graph: map from vertex-id to vertex | |
| ; Use zippers to edit map | |
| (defn- mk-process-policy [kw & | |
| {:keys [threads sleep-time freq exec] | |
| :or {threads (work/available-processors) | |
| sleep-time 5000 | |
| freq 200 | |
| exec work/sync}}] | |
| (case kw | |
| :queue-work (QueueWorkProcessPolicy. threads sleep-time exec) | |
| :source (SourceProcessPolicy. freq))) | |
| (defn- mk-node [id f outbox-policy & {:keys [process] :as opts}] | |
| (-> f | |
| (Vertex. (workq/local-queue) | |
| (apply mk-process-policy process (flatten opts)) | |
| outbox-policy) | |
| (assoc :id id))) | |
| (defn mk-broadcast-node [id f & opts] | |
| (mk-node id f (VertexBroadcastAllPolicy. []))) | |
| (defn mk-dispatch-node [id f disp & opts] | |
| (mk-node id f (VertexDispatchPolicy. disp nil))) | |
| (defn- add-edge-internal | |
| "add edge from src outbox to trg inbox, return new src. | |
| src outbox-policy must implement add-listener" | |
| [src trg] | |
| (add-listener src trg)) | |
| (defn- graph-zip | |
| [root-vertex] | |
| (zip/zipper | |
| ; branch? | |
| (fn [n] (and (instance? VertexBroadcastAllPolicy n) (:outs n))) | |
| ; children | |
| :outs | |
| ; make-node | |
| (partial reduce add-edge-internal) | |
| ; root | |
| root-vertex)) | |
| (defn mk-graph | |
| "make a graph with a single root node with data | |
| coming from a source root-fn" | |
| [root-fn & opts] | |
| (graph-zip (partial mk-broadcast-node :root root-fn :process :source))) | |
| (defn add-edge | |
| "using node location add edge. assumes node at location | |
| is a broadcast vertex" | |
| [node-loc trg] | |
| (zip/edit node-loc add-edge-internal trg)) | |
| (defn jump-to | |
| "jump to a node id to add edges" | |
| [node-loc id]) | |
| (defn run-graph | |
| "lock graph edits ensure all dispatch nodes | |
| have accurate vertex map and launch vertex pools" | |
| [node-loc]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment