Last active
March 6, 2016 22:48
-
-
Save timsgardner/f801cfa047b36c3f8c92 to your computer and use it in GitHub Desktop.
agent problem
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns flumbum | |
(:use clojure.pprint clojure.repl)) | |
;; ================================================== | |
;; logging erotica | |
(def logs (atom {} :validator #(every? var? (keys %)))) | |
(defmacro deflog [name & body] | |
`(let [f# (fn [] ~@body) | |
v# (def ~name (f#))] | |
(swap! logs assoc v# f#) | |
v#)) | |
(defn reset-logs [] | |
(doseq [[v f] @logs] | |
(alter-var-root v (fn [_] (f))))) | |
;; ================================================== | |
(deflog send-promise-log (atom [])) | |
(deflog send-promise-inner-log (atom [])) | |
(defn send-promise | |
"Wraps function f in another function that will deliver the result of calling f on the state of agent to a promise, | |
which is returned. Used to allow program to block until an agent actually updates with a function. " | |
[agent f] | |
(let [log-atom-1 (atom {:agent agent :f f})] | |
(swap! send-promise-log conj log-atom-1) | |
(let [prom (promise)] | |
(swap! log-atom-1 assoc :prom prom) | |
(send-off agent | |
(fn [state] | |
(let [log-atom-2 (atom {:state state})] | |
(swap! send-promise-inner-log conj log-atom-2) | |
(let [new (f state)] | |
(swap! log-atom-2 assoc :new new) | |
(deliver prom new) | |
new)))) | |
prom))) | |
(defn agent-loop | |
"Sends a function to agent a that will send itself to a again, updating a with f, | |
until (continue? state) returns false, where state is the state of a." | |
[a continue? f] | |
(letfn [(step [state] | |
(let [state2 (f state)] | |
(when (continue? state2) (send-off a step)) | |
state2))] | |
(send-off a step))) | |
(deflog failing-cases (atom [])) | |
(def ^:dynamic *promise-deref* | |
(fn [p env] | |
(let [v (deref p 1000 ::fail)] | |
(if (= ::fail v) | |
(do | |
(swap! failing-cases conj env) | |
(throw (Exception. "Didn't make it out!"))) | |
v)))) | |
(def small-test | |
(let [states-agent (agent {}) | |
trav (vec (for [i (range 5)] | |
(agent-loop | |
(agent {:loops 0 | |
:chore [i 0] | |
:state []}) | |
#(< 5 (:loops %)) | |
(fn [{:keys [chore] :as this}] | |
(let [the-atom (-> states-agent | |
(send-promise | |
(fn [m] | |
(if (get m i) | |
m | |
(assoc m i (atom :i-am-in-an-atom))))) | |
(*promise-deref* | |
{:states-agent-snapshot @states-agent | |
:chore chore | |
:*agent* *agent*}) | |
(get i)) | |
the-result (swap! the-atom conj chore)] | |
(-> this | |
(update :loops inc) | |
(update :state conj the-result) | |
(update :chore (fn [[i n]] [i (inc n)]))))))))] | |
{:states-agent states-agent | |
:trav trav})) | |
;; all agents failed: | |
;; flumbum=> (pprint small-test) | |
;; {:states-agent #<Agent@1c03a8a4: {}>, | |
;; :trav | |
;; [#<Agent@77b5b17b FAILED: {:loops 0, :chore [0 0], :state []}> | |
;; #<Agent@55a22dcf FAILED: {:loops 0, :chore [1 0], :state []}> | |
;; #<Agent@7d265659 FAILED: {:loops 0, :chore [2 0], :state []}> | |
;; #<Agent@392cf9df FAILED: {:loops 0, :chore [3 0], :state []}> | |
;; #<Agent@418b179d FAILED: {:loops 0, :chore [4 0], :state []}>]} | |
;; point of failure was promise deref timeout: | |
;; flumbum=> (pprint (map agent-error (:trav small-test))) | |
;; (#error { | |
;; :cause "Didn't make it out!" | |
;; :via | |
;; [{:type java.lang.Exception | |
;; :message "Didn't make it out!" | |
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}] | |
;; :trace | |
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147] | |
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143] | |
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222] | |
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121] | |
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941] | |
;; [clojure.lang.AFn applyToHelper "AFn.java" 154] | |
;; [clojure.lang.RestFn applyTo "RestFn.java" 132] | |
;; [clojure.lang.Agent$Action doRun "Agent.java" 114] | |
;; [clojure.lang.Agent$Action run "Agent.java" 163] | |
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142] | |
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617] | |
;; [java.lang.Thread run "Thread.java" 745]]} #error { | |
;; :cause "Didn't make it out!" | |
;; :via | |
;; [{:type java.lang.Exception | |
;; :message "Didn't make it out!" | |
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}] | |
;; :trace | |
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147] | |
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143] | |
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222] | |
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121] | |
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941] | |
;; [clojure.lang.AFn applyToHelper "AFn.java" 154] | |
;; [clojure.lang.RestFn applyTo "RestFn.java" 132] | |
;; [clojure.lang.Agent$Action doRun "Agent.java" 114] | |
;; [clojure.lang.Agent$Action run "Agent.java" 163] | |
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142] | |
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617] | |
;; [java.lang.Thread run "Thread.java" 745]]} #error { | |
;; :cause "Didn't make it out!" | |
;; :via | |
;; [{:type java.lang.Exception | |
;; :message "Didn't make it out!" | |
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}] | |
;; :trace | |
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147] | |
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143] | |
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222] | |
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121] | |
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941] | |
;; [clojure.lang.AFn applyToHelper "AFn.java" 154] | |
;; [clojure.lang.RestFn applyTo "RestFn.java" 132] | |
;; [clojure.lang.Agent$Action doRun "Agent.java" 114] | |
;; [clojure.lang.Agent$Action run "Agent.java" 163] | |
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142] | |
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617] | |
;; [java.lang.Thread run "Thread.java" 745]]} #error { | |
;; :cause "Didn't make it out!" | |
;; :via | |
;; [{:type java.lang.Exception | |
;; :message "Didn't make it out!" | |
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}] | |
;; :trace | |
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147] | |
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143] | |
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222] | |
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121] | |
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941] | |
;; [clojure.lang.AFn applyToHelper "AFn.java" 154] | |
;; [clojure.lang.RestFn applyTo "RestFn.java" 132] | |
;; [clojure.lang.Agent$Action doRun "Agent.java" 114] | |
;; [clojure.lang.Agent$Action run "Agent.java" 163] | |
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142] | |
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617] | |
;; [java.lang.Thread run "Thread.java" 745]]} #error { | |
;; :cause "Didn't make it out!" | |
;; :via | |
;; [{:type java.lang.Exception | |
;; :message "Didn't make it out!" | |
;; :at [flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147]}] | |
;; :trace | |
;; [[flumbum$_STAR_promise_deref_STAR_ invokeStatic "NO_SOURCE_FILE" 1147] | |
;; [flumbum$_STAR_promise_deref_STAR_ invoke "NO_SOURCE_FILE" 1143] | |
;; [flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911 invoke "NO_SOURCE_FILE" 1222] | |
;; [flumbum$agent_loop$step__1801 invoke "NO_SOURCE_FILE" 1121] | |
;; [clojure.core$binding_conveyor_fn$fn__4676 invoke "core.clj" 1941] | |
;; [clojure.lang.AFn applyToHelper "AFn.java" 154] | |
;; [clojure.lang.RestFn applyTo "RestFn.java" 132] | |
;; [clojure.lang.Agent$Action doRun "Agent.java" 114] | |
;; [clojure.lang.Agent$Action run "Agent.java" 163] | |
;; [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142] | |
;; [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617] | |
;; [java.lang.Thread run "Thread.java" 745]]}) | |
;; nil | |
;; got to send-promise: | |
;; flumbum=> (pprint @send-promise-log) | |
;; [#<Atom@431b93f7: | |
;; {:agent #<Agent@1c03a8a4: {}>, | |
;; :f | |
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x232760ba "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@232760ba"], | |
;; :prom #<Promise@3a2a6674: :not-delivered>}> | |
;; #<Atom@769e5ab5: | |
;; {:agent #<Agent@1c03a8a4: {}>, | |
;; :f | |
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x504b6bec "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@504b6bec"], | |
;; :prom #<Promise@2f772a85: :not-delivered>}> | |
;; #<Atom@7c15cf03: | |
;; {:agent #<Agent@1c03a8a4: {}>, | |
;; :f | |
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x455d2257 "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@455d2257"], | |
;; :prom #<Promise@67a8411a: :not-delivered>}> | |
;; #<Atom@25422cd2: | |
;; {:agent #<Agent@1c03a8a4: {}>, | |
;; :f | |
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x521957d8 "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@521957d8"], | |
;; :prom #<Promise@6dc4445f: :not-delivered>}> | |
;; #<Atom@144c1a2: | |
;; {:agent #<Agent@1c03a8a4: {}>, | |
;; :f | |
;; #object[flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913 0x36faad5 "flumbum$fn__1900$iter__1901__1905$fn__1906$fn__1907$fn__1911$fn__1913@36faad5"], | |
;; :prom #<Promise@4aadbe1b: :not-delivered>}>] | |
;; but promises never fired on the state-agent, despite it being ready (not failed): | |
;; flumbum=> (pprint @send-promise-inner-log) | |
;; [] | |
(reset-logs) | |
;; analogous test with backing atom rather than agent (states-atom rather than states-agent): | |
(def atom-state-test | |
(let [states-atom (atom {}) | |
trav (vec (for [i (range 5)] | |
(agent-loop | |
(agent {:loops 0 | |
:chore [i 0] | |
:state []}) | |
#(< (:loops %) 5) | |
(fn [{:keys [chore] :as this}] | |
(let [the-atom (-> states-atom | |
(swap! | |
(fn [m] | |
(if (m i) | |
m | |
(assoc m i (atom 0))))) | |
(get i)) | |
the-result (swap! the-atom inc)] | |
(-> this | |
(update :loops inc) | |
(update :state conj the-result) | |
(update :chore (fn [[i n]] [i (inc n)]))))))))] | |
{:states-atom states-atom | |
:trav trav})) | |
;; works fine: | |
;; flumbum=> (pprint atom-state-test) | |
;; {:states-atom | |
;; #<Atom@5f6391c4: | |
;; {0 #<Atom@37c1ec88: 5>, | |
;; 1 #<Atom@1af83af0: 5>, | |
;; 2 #<Atom@52a20cf6: 5>, | |
;; 3 #<Atom@1bad5266: 5>, | |
;; 4 #<Atom@5187d120: 5>}>, | |
;; :trav | |
;; [#<Agent@2fc65aa8: {:loops 5, :chore [0 5], :state [1 2 3 4 5]}> | |
;; #<Agent@2e1e36ce: {:loops 5, :chore [1 5], :state [1 2 3 4 5]}> | |
;; #<Agent@776b1d7: {:loops 5, :chore [2 5], :state [1 2 3 4 5]}> | |
;; #<Agent@7010d0e3: {:loops 5, :chore [3 5], :state [1 2 3 4 5]}> | |
;; #<Agent@228c10bd: {:loops 5, :chore [4 5], :state [1 2 3 4 5]}>]} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment