Created
January 12, 2014 00:12
-
-
Save lantiga/53d163fb00db2a754099 to your computer and use it in GitHub Desktop.
asyncatom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns asyncatom.core | |
(:require [clojure.core.async :refer [chan go <! >! <!!]]) | |
(:refer-clojure :exclude [atom swap! reset! compare-and-set! deref])) | |
(defn atom [v] | |
(let [c (chan)] | |
(go | |
(loop [v v] | |
(let [[cmd oc & args] (<! c) | |
v (condp = cmd | |
:get (do (>! oc v) v) | |
:set (let [[nv] args] (>! oc nv) nv) | |
:cas (let [[ov nv] args] | |
(if (= ov v) | |
(do (>! oc [:acc nv]) nv) | |
(do (>! oc [:rej v]) v))) | |
:get-in (let [[ks] args] (>! oc (get-in v ks)) v) | |
:set-in (let [[nkv ks] args | |
nv (assoc-in v ks nkv)] | |
(>! oc nv) | |
nv) | |
:cas-in (let [[okv nkv ks] args] | |
(if (= okv (get-in v ks)) | |
(let [nv (assoc-in v ks nkv)] (>! oc [:acc nv]) nv) | |
(do (>! oc [:rej v]) v))) | |
:get-m (let [ksv args] (>! oc (map #(get-in v %) ksv)) v) | |
:set-m (let [[nkvm] args | |
nv (reduce (fn [[ks nkv]] (assoc-in v ks nkv)) v nkvm)] | |
(>! oc nv) | |
nv) | |
:cas-m (let [[okvm nkvm] args] | |
(if (every? (map (fn [[ks okv]] (= okv (get-in v ks))) okvm)) | |
(let [nv (reduce (fn [[ks nkv]] (assoc-in v ks nkv)) v nkvm)] (>! oc [:acc nv]) nv) | |
(do (>! oc [:rej v]) v))))] | |
(recur v)))) | |
c)) | |
;; TODO: | |
;; - make add-watch a map indexed by ks, where watches can be triggered upon updates of sub-maps | |
;; - write ant colony demo as example | |
;; | |
;; From the ant colony demo it appears that one of the key ingredients is that the agent function is | |
;; retried if the transaction fails. | |
;; Explore new -m functions. The ant colony could be written with agents and -m functions, since we | |
;; can CAS based on multiple positions on the grid and if any changes we retry the swap. | |
;; We achieve the minimal amount of swaps. | |
(defn swap! [c f] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(loop [[res out] [nil nil]] | |
(if (= :acc res) | |
out | |
(do | |
(>! c [:get oc]) | |
(let [v (<! oc)] | |
(>! c [:cas oc v (f v)]) | |
(recur (<! oc)))))))))) | |
(defn reset! [c v] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:set oc v]) | |
(<! oc))))) | |
(defn compare-and-set! [c ov nv] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:cas oc ov nv]) | |
(<! oc))))) | |
(defn deref [c] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:get oc]) | |
(<! oc))))) | |
(defn swap-in! [c f ks] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(loop [[res out] [nil nil]] | |
(if (= :acc res) | |
out | |
(do | |
(>! c [:get-in oc ks]) | |
(let [v (<! oc)] | |
(>! c [:cas-in oc v (f v) ks]) | |
(recur (<! oc)))))))))) | |
(defn reset-in! [c v ks] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:set-in oc v ks]) | |
(<! oc))))) | |
(defn compare-and-set-in! [c ov nv ks] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:cas-in oc ov nv ks]) | |
(<! oc))))) | |
(defn deref-in [c ks] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:get-in oc ks]) | |
(<! oc))))) | |
(defn swap-m! [c ksfm] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(loop [[res out] [nil nil]] | |
(if (= :acc res) | |
out | |
(let [ks (keys ksfm)] | |
(>! c [:get-m oc ks]) | |
(let [v (<! oc) | |
okvm (zipmap ks v) | |
nkvm (zipmap ks (map (fn [[ks f]] f v) ksfm))] | |
(>! c [:cas-m oc okvm nkvm]) | |
(recur (<! oc)))))))))) | |
(defn reset-m! [c nkvm] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:set-m oc nkvm]) | |
(<! oc))))) | |
(defn compare-and-set-m! [c okvm nkvm] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:cas-in oc okvm nkvm]) | |
(<! oc))))) | |
(defn deref-m [c ksv] | |
(<!! | |
(go | |
(let [oc (chan)] | |
(>! c [:get-in oc ksv]) | |
(<! oc))))) | |
(comment | |
(defn async-atom-test [n] | |
(let [a (async-atom 0) | |
increment (fn [x] (do (Thread/sleep 1) (inc x))) | |
f (partial async-swap! a increment)] | |
(doall (repeatedly n f)) | |
(async-deref a))) | |
(defn async-atom-parallel-test [n] | |
(let [a (async-atom 0) | |
increment (fn [x] (do (Thread/sleep 1) (inc x))) | |
f (partial async-swap! a increment)] | |
(doall (apply pcalls (repeat n f))) | |
(async-deref a))) | |
(defn atom-test [n] | |
(let [a (atom 0) | |
increment (fn [x] (do (Thread/sleep 1) (inc x))) | |
f (partial swap! a increment)] | |
(doall (repeatedly n f)) | |
(deref a))) | |
(defn atom-parallel-test [n] | |
(let [a (atom 0) | |
increment (fn [x] (do (Thread/sleep 1) (inc x))) | |
f (partial swap! a increment)] | |
(doall (apply pcalls (repeat n f))) | |
(deref a))) | |
(defn async-atom-in-test [] | |
(let [a (async-atom {:a 0 :b {:c 0}})] | |
(async-swap-in! a inc [:b :c]))) | |
(defn async-atom-parallel-in-test [n] | |
(let [a (async-atom {:a 0 :b {:c 0}}) | |
increment (fn [x] (do (Thread/sleep 1) (inc x))) | |
f1 (partial async-swap-in! a increment [:a]) | |
f2 (partial async-swap-in! a increment [:b :c])] | |
;; TODO: how to check that f1 and f2 are proceeding independently? | |
(doall (apply pcalls (interleave (repeat n f1) (repeat n f2)))) | |
(async-deref a)))) | |
(comment | |
(let [at (atom 0) | |
ag (agent 0)] | |
(send ag (fn [v] (swap! at inc) (inc v))) | |
(await ag) | |
[@at @ag]) | |
(let [a (async-atom [[0 0 0] [1 1 1] [2 2 2]])] | |
(async-swap-in! a inc [0 1]))) | |
(comment | |
(time (atom-test 1000)) | |
(time (async-atom-test 1000)) | |
(time (atom-parallel-test 1000)) | |
(time (async-atom-parallel-test 1000)) | |
(async-atom-in-test) | |
(async-atom-parallel-in-test 100) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment