Last active
August 29, 2015 14:18
-
-
Save pleasetrythisathome/c4db8a42b5067ef20052 to your computer and use it in GitHub Desktop.
asynchronous "protocol" handshakes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns async-protocol | |
(:require [clojure.core.async :as async] | |
[clojure.set :as set] | |
[com.stuartsierra.component :as component :refer (Lifecycle)] | |
[taoensso.timbre :as log])) | |
(defn throw-err [e] | |
(when (instance? Throwable e) (throw e)) | |
e) | |
(defn <? [ch] | |
(throw-err (async/<! ch))) | |
(defn <?? [ch] | |
(throw-err (async/<!! ch))) | |
(defprotocol IAsyncProtocol | |
(send-chan [_]) | |
(receive-chan [_]) | |
(sent-events [_]) | |
(received-events [_])) | |
(defn matching-arities? [source-fn arities] | |
(every? (fn [parameters] | |
(some (fn [args] | |
(let [[args parameters] (map (partial remove (partial = '_)) | |
[args parameters])] | |
(and parameters | |
(if (= '& (last (butlast parameters))) | |
(>= (count args) (- (count parameters) 2)) | |
(= (count parameters) (count args)))))) | |
arities)) | |
(:arglists (meta source-fn)))) | |
(defn async-handshake! [component] | |
(assert (satisfies? IAsyncProtocol component) "component must satisfy IAsyncProtocol") | |
(let [send-c (send-chan component) | |
receive-c (receive-chan component)] | |
(when-let [events (sent-events component)] | |
(assert (and (seq events) send-c) "components with sent-events must provide a channel from send-chan") | |
(async/put! send-c events (sent-events component))) | |
(when-let [must-satisfy (received-events component)] | |
(assert (and (seq must-satisfy) receive-c) "components with send events must provide a channel from receive-chan") | |
(async/go | |
(try (loop [satisfied #{} | |
failed #{}] | |
(let [[v c] (async/alts! [receive-c | |
(async/timeout 1000)])] | |
(cond | |
(= (first v) ::done) | |
(if (= (second v) component) | |
true | |
(do (async/put! send-c v) | |
(recur satisfied failed))) | |
(not v) | |
(throw (ex-info (str "failed matching async-satisifes") | |
{:reason ::timeout | |
:component component})) | |
:else | |
(let [events v | |
[satisfied failed] | |
(reduce-kv (fn [[satisfied failed] k arities] | |
(let [v (get must-satisfy k)] | |
(if (and v (matching-arities? v arities)) | |
[(conj satisfied k) failed] | |
(cond | |
(not send-c) | |
(throw (ex-info (str "dead end matching async-satisifes " k) | |
{:reason ::dead-end | |
:event-key k | |
:arities arities | |
:component component})) | |
(get failed [k arities]) | |
(throw (ex-info (str "failed circuit matching async-satisifes " k) | |
{:reason ::failed | |
:event-key k | |
:arities arities | |
:component component})) | |
:else | |
(do | |
(async/put! send-c {k arities}) | |
[satisfied (conj failed [k arities])]))))) | |
[satisfied failed] events)] | |
(when-not (seq (set/difference (set (keys must-satisfy)) satisfied)) | |
(async/put! send-c [::done component])) | |
(recur satisfied failed))))) | |
(catch Exception e | |
e)))))) | |
(defn start-receive-loop! [component] | |
(assert (satisfies? IAsyncProtocol component) "component must satisfy IAsyncProtocol") | |
(let [receive-c (receive-chan component) | |
send-c (send-chan component) | |
events (received-events component)] | |
(assert (and (seq events) receive-c) "components with send events must provide a channel from receive-chan") | |
(when (seq events) | |
(async/take! | |
(async/go | |
(try (loop [] | |
(when-let [event (async/<! receive-c)] | |
(let [[key & args] event | |
args (or args [])] | |
(if-let [handler-var (get events key)] | |
(cond | |
(not (matching-arities? handler-var [args])) | |
(throw (ex-info (str "malformed args, event:" key) | |
{:reason ::malformed-args | |
:event-key key | |
:args args | |
:expected (:arglists (meta handler-var)) | |
:handler (meta handler-var)})) | |
:else | |
(let [handler (->> handler-var | |
meta | |
((juxt :ns :name)) | |
(apply ns-resolve))] | |
(apply handler component args))) | |
(when send-c | |
(async/put! send-c event)))) | |
(recur))) | |
(catch Exception e | |
e))) | |
throw-err)))) | |
(defprotocol IDropdown | |
(open [_]) | |
(close [_])) | |
(defrecord Dropdown [send-c receive-c] | |
IAsyncProtocol | |
(send-chan [_] | |
send-c) | |
(receive-chan [_] | |
receive-c) | |
(sent-events [_] | |
{::click-toggle '[[toggled?]] | |
::click-item '[[key]]}) | |
(received-events [_] | |
{::open #'open | |
::close #'close}) | |
IDropdown | |
(open [_] | |
(log/info :open)) | |
(close [_] | |
(log/info :close))) | |
(defprotocol IHandleDropdown | |
(on-click-toggle [_ toggled?]) | |
(on-click-item [_ key])) | |
(defrecord DropdownHandler [send-c receive-c] | |
IAsyncProtocol | |
(send-chan [_] | |
send-c) | |
(receive-chan [_] | |
receive-c) | |
(sent-events [_] | |
{::open [[]] | |
::close [[]]}) | |
(received-events [_] | |
{::click-toggle #'on-click-toggle | |
::click-item #'on-click-item}) | |
IHandleDropdown | |
(on-click-toggle [_ toggled?] | |
(log/info :click-toggle toggled?)) | |
(on-click-item [_ key] | |
(log/info :click-item key))) | |
(def system | |
(component/start | |
(component/system-map | |
:to-dropdown (async/chan) | |
:from-dropdown (async/chan) | |
:dropdown (-> (map->Dropdown {}) | |
(component/using {:send-c :from-dropdown | |
:receive-c :to-dropdown})) | |
:dropdown-handler (-> (map->DropdownHandler {}) | |
(component/using {:send-c :to-dropdown | |
:receive-c :from-dropdown}))))) | |
(let [cmps (filter (partial satisfies? IAsyncProtocol) (vals system))] | |
(when (->> (doall | |
(for [cmp cmps] | |
(async-handshake! cmp))) | |
(mapv <??) | |
(reduce =)) | |
(doseq [cmp cmps] | |
(start-receive-loop! cmp)))) | |
(comment | |
(async/put! (:to-dropdown system) [::open]) | |
(async/put! (:to-dropdown system) [::close]) | |
(async/put! (:from-dropdown system) [::click-toggle true]) | |
(async/put! (:from-dropdown system) [::click-item :some-button]) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment