Last active
May 4, 2024 11:51
-
-
Save awwx/6d02e6ea702bdc499bcf847e9b9bb98f to your computer and use it in GitHub Desktop.
observe Missionary task and flow events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; SPDX-License-Identifier: EPL-2.0 OR MIT | |
(ns mobserve | |
(:require | |
[missionary.core :as m]) | |
(:import | |
missionary.Cancelled)) | |
;; Encapsulate differences between Clojure and ClojureScript | |
;; on how IFn and IDeref are implemented. | |
#?(:clj | |
(deftype FlowIterator [transfer cancel] | |
clojure.lang.IDeref | |
(deref [_this] | |
(transfer)) | |
clojure.lang.IFn | |
(invoke [_this] | |
(cancel))) | |
:cljs | |
(deftype FlowIterator [transfer cancel] | |
IDeref | |
(-deref [_this] | |
(transfer)) | |
IFn | |
(-invoke [_this] | |
(cancel)))) | |
(defn println-reporter [description msg & args] | |
;; Lazy evaluation interacts badly with dynamic binding; | |
;; evaluate the pr-str before println changes the print mode. | |
(let [pr-args (mapv pr-str args)] | |
(apply println description msg pr-args))) | |
(defn characterize-error [e] | |
(cond | |
(instance? Cancelled e) | |
:canceled | |
:else | |
e)) | |
(defn observe-task | |
([description task] | |
(observe-task println-reporter description task)) | |
([report description task] | |
(fn [s f] | |
(report description :task-instantiates) | |
(let [cancel! (task | |
(fn [v] | |
(report description :task-succeeded v) | |
(s v)) | |
(fn [e] | |
(report description :task-failed (characterize-error e)) | |
(f e)))] | |
(report description :task-instantiated) | |
(fn [] | |
(report description :caller-cancels) | |
(cancel!)))))) | |
(defn hook-call [thunk successful-call call-threw] | |
(let [v | |
(try | |
(thunk) | |
(catch #?(:clj Throwable, :cljs :default) e | |
(call-threw e) | |
(throw e)))] | |
(successful-call v) | |
v)) | |
(defn observe-flow | |
([description flow] | |
(observe-flow println-reporter description flow)) | |
([report description flow] | |
(fn [notifier terminator] | |
(report description :consumer-instantiates) | |
(let [child-iterator | |
(flow | |
(fn notify [] | |
(report description :producer-notifies) | |
(notifier)) | |
(fn terminate [] | |
(report description :producer-terminates) | |
(terminator)))] | |
(report description :producer-instantiated) | |
(FlowIterator. | |
(fn transfer [] | |
(report description :consumer-transfers) | |
(hook-call | |
(fn [] @child-iterator) | |
(fn [v] | |
(report description :transferred-value v)) | |
(fn [e] | |
(report description :transferred-error (characterize-error e))))) | |
(fn cancel [] | |
(report description :consumer-cancels) | |
(child-iterator))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment