Skip to content

Instantly share code, notes, and snippets.

@timsgardner
Last active March 6, 2016 22:48
Show Gist options
  • Save timsgardner/f801cfa047b36c3f8c92 to your computer and use it in GitHub Desktop.
Save timsgardner/f801cfa047b36c3f8c92 to your computer and use it in GitHub Desktop.
agent problem
(ns flumbum
(:use clojure.pprint clojure.repl))
;; ==================================================
;; logging erotica
(def logs (atom {} :validator #(every? var? (keys %))))
(defmacro deflog [name & body]
`(let [f# (fn [] ~@body)
v# (def ~name (f#))]
(swap! logs assoc v# f#)
v#))
(defn reset-logs []
(doseq [[v f] @logs]
(alter-var-root v (fn [_] (f)))))
;; ==================================================
(deflog send-promise-log (atom []))
(deflog send-promise-inner-log (atom []))
(defn send-promise
"Wraps function f in another function that will deliver the result of calling f on the state of agent to a promise,
which is returned. Used to allow program to block until an agent actually updates with a function. "
[agent f]
(let [log-atom-1 (atom {:agent agent :f f})]
(swap! send-promise-log conj log-atom-1)
(let [prom (promise)]
(swap! log-atom-1 assoc :prom prom)
(send-off agent
(fn [state]
(let [log-atom-2 (atom {:state state})]
(swap! send-promise-inner-log conj log-atom-2)
(let [new (f state)]
(swap! log-atom-2 assoc :new new)
(deliver prom new)
new))))
prom)))
(defn agent-loop
"Sends a function to agent a that will send itself to a again, updating a with f,
until (continue? state) returns false, where state is the state of a."
[a continue? f]
(letfn [(step [state]
(let [state2 (f state)]
(when (continue? state2) (send-off a step))
state2))]
(send-off a step)))
(deflog failing-cases (atom []))
(def ^:dynamic *promise-deref*
(fn [p env]
(let [v (deref p 1000 ::fail)]
(if (= ::fail v)
(do
(swap! failing-cases conj env)
(throw (Exception. "Didn't make it out!")))
v))))
(def small-test
(let [states-agent (agent {})
trav (vec (for [i (range 5)]
(agent-loop
(agent {:loops 0
:chore [i 0]
:state []})
#(< 5 (:loops %))
(fn [{:keys [chore] :as this}]
(let [the-atom (-> states-agent
(send-promise
(fn [m]
(if (get m i)
m
(assoc m i (atom :i-am-in-an-atom)))))
(*promise-deref*
{:states-agent-snapshot @states-agent
:chore chore
:*agent* *agent*})
(get i))
the-result (swap! the-atom conj chore)]
(-> this
(update :loops inc)
(update :state conj the-result)
(update :chore (fn [[i n]] [i (inc n)]))))))))]
{:states-agent states-agent
:trav trav}))
;; all agents failed:
;; flumbum=> (pprint small-test)
;; {:states-agent #<Agent@1c03a8a4: {}>,
;; :trav
;; [#<Agent@77b5b17b FAILED: {:loops 0, :chore [0 0], :state []}>
;; #<Agent@55a22dcf FAILED: {:loops 0, :chore [1 0], :state []}>
;; #<Agent@7d265659 FAILED: {:loops 0, :chore [2 0], :state []}>
;; #<Agent@392cf9df FAILED: {:loops 0, :chore [3 0], :state []}>
;; #<Agent@418b179d FAILED: {:loops 0, :chore [4 0], :state []}>]}
;; point of failure was promise deref timeout:
;; flumbum=> (pprint (map agent-error (:trav small-test)))
;; (#error {
;; :cause "Didn't make it out!"
;; :via
;; [{:type java.lang.Exception
;; :message "Didn't make it out!"
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}]
;; :trace
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143]
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222]
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121]
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941]
;; [clojure.lang.AFn applyToHelper "AFn.java" 154]
;; [clojure.lang.RestFn applyTo "RestFn.java" 132]
;; [clojure.lang.Agent$Action doRun "Agent.java" 114]
;; [clojure.lang.Agent$Action run "Agent.java" 163]
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
;; [java.lang.Thread run "Thread.java" 745]]} #error {
;; :cause "Didn't make it out!"
;; :via
;; [{:type java.lang.Exception
;; :message "Didn't make it out!"
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}]
;; :trace
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143]
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222]
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121]
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941]
;; [clojure.lang.AFn applyToHelper "AFn.java" 154]
;; [clojure.lang.RestFn applyTo "RestFn.java" 132]
;; [clojure.lang.Agent$Action doRun "Agent.java" 114]
;; [clojure.lang.Agent$Action run "Agent.java" 163]
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
;; [java.lang.Thread run "Thread.java" 745]]} #error {
;; :cause "Didn't make it out!"
;; :via
;; [{:type java.lang.Exception
;; :message "Didn't make it out!"
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}]
;; :trace
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143]
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222]
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121]
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941]
;; [clojure.lang.AFn applyToHelper "AFn.java" 154]
;; [clojure.lang.RestFn applyTo "RestFn.java" 132]
;; [clojure.lang.Agent$Action doRun "Agent.java" 114]
;; [clojure.lang.Agent$Action run "Agent.java" 163]
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
;; [java.lang.Thread run "Thread.java" 745]]} #error {
;; :cause "Didn't make it out!"
;; :via
;; [{:type java.lang.Exception
;; :message "Didn't make it out!"
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}]
;; :trace
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143]
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222]
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121]
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941]
;; [clojure.lang.AFn applyToHelper "AFn.java" 154]
;; [clojure.lang.RestFn applyTo "RestFn.java" 132]
;; [clojure.lang.Agent$Action doRun "Agent.java" 114]
;; [clojure.lang.Agent$Action run "Agent.java" 163]
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
;; [java.lang.Thread run "Thread.java" 745]]} #error {
;; :cause "Didn't make it out!"
;; :via
;; [{:type java.lang.Exception
;; :message "Didn't make it out!"
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}]
;; :trace
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143]
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222]
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121]
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941]
;; [clojure.lang.AFn applyToHelper "AFn.java" 154]
;; [clojure.lang.RestFn applyTo "RestFn.java" 132]
;; [clojure.lang.Agent$Action doRun "Agent.java" 114]
;; [clojure.lang.Agent$Action run "Agent.java" 163]
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
;; [java.lang.Thread run "Thread.java" 745]]})
;; nil
;; got to send-promise:
;; flumbum=> (pprint @send-promise-log)
;; [#<Atom@431b93f7:
;; {:agent #<Agent@1c03a8a4: {}>,
;; :f
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x232760ba "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@232760ba"],
;; :prom #<Promise@3a2a6674: :not-delivered>}>
;; #<Atom@769e5ab5:
;; {:agent #<Agent@1c03a8a4: {}>,
;; :f
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x504b6bec "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@504b6bec"],
;; :prom #<Promise@2f772a85: :not-delivered>}>
;; #<Atom@7c15cf03:
;; {:agent #<Agent@1c03a8a4: {}>,
;; :f
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x455d2257 "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@455d2257"],
;; :prom #<Promise@67a8411a: :not-delivered>}>
;; #<Atom@25422cd2:
;; {:agent #<Agent@1c03a8a4: {}>,
;; :f
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x521957d8 "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@521957d8"],
;; :prom #<Promise@6dc4445f: :not-delivered>}>
;; #<Atom@144c1a2:
;; {:agent #<Agent@1c03a8a4: {}>,
;; :f
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x36faad5 "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@36faad5"],
;; :prom #<Promise@4aadbe1b: :not-delivered>}>]
;; but promises never fired on the state-agent, despite it being ready (not failed):
;; flumbum=> (pprint @send-promise-inner-log)
;; []
(reset-logs)
;; analogous test with backing atom rather than agent (states-atom rather than states-agent):
(def atom-state-test
(let [states-atom (atom {})
trav (vec (for [i (range 5)]
(agent-loop
(agent {:loops 0
:chore [i 0]
:state []})
#(< (:loops %) 5)
(fn [{:keys [chore] :as this}]
(let [the-atom (-> states-atom
(swap!
(fn [m]
(if (m i)
m
(assoc m i (atom 0)))))
(get i))
the-result (swap! the-atom inc)]
(-> this
(update :loops inc)
(update :state conj the-result)
(update :chore (fn [[i n]] [i (inc n)]))))))))]
{:states-atom states-atom
:trav trav}))
;; works fine:
;; flumbum=> (pprint atom-state-test)
;; {:states-atom
;; #<Atom@5f6391c4:
;; {0 #<Atom@37c1ec88: 5>,
;; 1 #<Atom@1af83af0: 5>,
;; 2 #<Atom@52a20cf6: 5>,
;; 3 #<Atom@1bad5266: 5>,
;; 4 #<Atom@5187d120: 5>}>,
;; :trav
;; [#<Agent@2fc65aa8: {:loops 5, :chore [0 5], :state [1 2 3 4 5]}>
;; #<Agent@2e1e36ce: {:loops 5, :chore [1 5], :state [1 2 3 4 5]}>
;; #<Agent@776b1d7: {:loops 5, :chore [2 5], :state [1 2 3 4 5]}>
;; #<Agent@7010d0e3: {:loops 5, :chore [3 5], :state [1 2 3 4 5]}>
;; #<Agent@228c10bd: {:loops 5, :chore [4 5], :state [1 2 3 4 5]}>]}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment