Skip to content

Instantly share code, notes, and snippets.

@aria42
Created December 13, 2010 17:21
Show Gist options
  • Select an option

  • Save aria42/739258 to your computer and use it in GitHub Desktop.

Select an option

Save aria42/739258 to your computer and use it in GitHub Desktop.
(ns work.graph
(:require
[clojure.contrib.logging :as log]
[clojure.zip :as zip]
[work.core :as work]
[work.queue :as workq]))
(defn table
"takes kv pairs.
keys can be vectors of keys, matching a fn.
values can be vectors of fns to apply in sequential order."
[& pairs]
(apply hash-map (flatten (map (fn [[k v]]
(let [f (if (vector? v)
(apply juxt v)
v)]
(if (vector? k)
(map (fn [ki] [ki f]) k)
[k f])))
(partition 2 2 pairs)))))
(defn dispatch
"takes a dispatch fn and dispatch table(s).
returns a fn taking args, dispatching on args, and applying dispatch fn to args."
[d & tables]
(let [table (apply merge tables)]
(fn [& args]
(let [f (table (apply d args))]
(when-not f
(log/error (format "unable to dispatch on %s." (pr-str args))))
(apply f args)))))
;; New Graph Stuff
; A vertex is an inbox-queue, process-policy and outbox-policy
(defrecord Vertex [f inbox-queue process-policy outbox-policy])
;; Vertex Inbox
(defprotocol VertexOutboxPolicy
(broadcast [this x]
"broadcast output x to neighbors or for side-effect")
(add-listener [this listener]
"return new VertexOutbox Policy with another listener.
OPTIONAL."))
(defprotocol VertexProcessPolicy
(start-pool [this f inbox-queue outbox-policy]
"return pool to process f
with broadcast policy. doesn't need to spawn
a new pool, just return a handle to one"))
;; Broadcast Policy
;; Either use expandable adjacency list or jump via dispatch
; outs is a seq of vertices
(defrecord VertexBroadcastAllPolicy [outs]
VertexOutboxPolicy
(broadcast [this x] (doseq [o outs] (workq/offer (:inbox-queue o) x)))
(add-listener [this listener] (update-in this [:outs] conj listener)))
(defrecord VertexNoOpOutboxPolicy []
(broadcast [this x]))
; dispatch fn takes x -> vertex id
; graph vertex id -> vertex
; before running graph, each VertexDispatchPolicy
; will have current graph
(defrecord VertexDispatchPolicy [dispatch graph]
VertexOutboxPolicy
(broadcast [this x]
(let [dispatch-id (dispatch x)
out-vertex (graph dispatch-id)]
(workq/offer (:inbox-queue out-vertex) x))))
;; Process Policy
;; Queue-Work
(defrecord QueueWorkProcessPolicy [threads sleep-time exec]
VertexProcessPolicy
(start-pool [this f inbox-queue outbox-policy]
(future (work/queue-work
(merge this
{ :f f
:in (partial workq/poll inbox-queue)
:out (partial broadcast outbox-policy)})))))
;; Vertex process policy for an "input" vertex
;; where we inject data
(defrecord SourceProcessPolicy [freq]
VertexProcessPolicy
(start-pool [this f _ outbox-policy]
(future (work/schedule-work
freq
#(broadcast outbox-policy (f))))))
(defn- run-vertex
"launch vertex return vertex with :pool field"
[{:keys [f,inbox-queue,outbox-policy] :as vertex}]
(assoc vertex
:pool (start-pool vertex f inbox-queue outbox-policy)))
;; New Graph Stuff
; Graph: map from vertex-id to vertex
; Use zippers to edit map
(defn- mk-process-policy [kw &
{:keys [threads sleep-time freq exec]
:or {threads (work/available-processors)
sleep-time 5000
freq 200
exec work/sync}}]
(case kw
:queue-work (QueueWorkProcessPolicy. threads sleep-time exec)
:source (SourceProcessPolicy. freq)))
(defn- mk-node [id f outbox-policy & {:keys [process] :as opts}]
(-> f
(Vertex. (workq/local-queue)
(apply mk-process-policy process (flatten opts))
outbox-policy)
(assoc :id id)))
(defn mk-broadcast-node [id f & opts]
(mk-node id f (VertexBroadcastAllPolicy. [])))
(defn mk-dispatch-node [id f disp & opts]
(mk-node id f (VertexDispatchPolicy. disp nil)))
(defn- add-edge-internal
"add edge from src outbox to trg inbox, return new src.
src outbox-policy must implement add-listener"
[src trg]
(add-listener src trg))
(defn- graph-zip
[root-vertex]
(zip/zipper
; branch?
(fn [n] (and (instance? VertexBroadcastAllPolicy n) (:outs n)))
; children
:outs
; make-node
(partial reduce add-edge-internal)
; root
root-vertex))
(defn mk-graph
"make a graph with a single root node with data
coming from a source root-fn"
[root-fn & opts]
(graph-zip (partial mk-broadcast-node :root root-fn :process :source)))
(defn add-edge
"using node location add edge. assumes node at location
is a broadcast vertex"
[node-loc trg]
(zip/edit node-loc add-edge-internal trg))
(defn jump-to
"jump to a node id to add edges"
[node-loc id])
(defn run-graph
"lock graph edits ensure all dispatch nodes
have accurate vertex map and launch vertex pools"
[node-loc])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment