Created
April 16, 2021 07:42
-
-
Save holyjak/61d89610e488f31d8c53b8bbbad299c1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns catching-transduce | |
"See [[catching-transduce]] below" | |
(:require [clojure.core.async :as a :refer [chan to-chan pipeline-blocking <!!]])) | |
(defmacro err-or | |
"If body throws an exception, catch it and return it" | |
[& forms] | |
`(try | |
~@forms | |
(catch Throwable t# t#))) | |
(def throwable? (partial instance? Throwable)) | |
(defn catch-ex-as-data | |
"Transducer that catches errors from the transducers below (catching errors | |
both in the transducing and reducing functions) and returns the first one. | |
Params: | |
- `on-error` - call with the exception if there is one when done | |
It should be first, i.e. at the top of `(comp (catch-ex-as-data) ...)`)" | |
([] (catch-ex-as-data nil)) | |
([on-error] | |
(fn [xf] | |
(fn | |
([] (err-or (xf))) | |
([result] | |
(let [res (if (throwable? result) | |
result ; don't pass anomalies down | |
(err-or (xf result)))] | |
(when (and on-error (throwable? res)) | |
(on-error res)) | |
res)) | |
([result input] | |
(try (xf result input) | |
(catch Throwable t | |
(reduced t)))))))) | |
(defn drain | |
"Close ch and discard all items on it. Returns nil. | |
Beware: This does not drain the channel that `ch` reads from so | |
it only stops things upstream, not downstream." | |
[ch] | |
(a/close! ch) | |
(a/go-loop [] | |
(when (a/<! ch) (recur))) | |
nil) | |
(with-test | |
(defn catching-transduce | |
"Similar to `core.async/transduce` but returns the reduced value and | |
captures 'anomalies' (i.e. Java's `Throwable` sent as data) in the `chan` data and | |
captures exceptions in `xf` and `f`, stopping at the first one. | |
Returns the result or throws if there was any anomaly / exception." | |
[xf f init ch] | |
(let [[err-ch data-ch] (a/split throwable? ch) | |
;; ALTERNATIVE IMPL: Upon anomaly discovery in `ch`, `untap[-all]` the | |
;; data chan + close it, consume the test of `ch` counting | |
;; # items / errors | |
errors-ch (a/into [] err-ch) | |
data-cnt (atom 0) | |
result-ch (->> | |
data-ch | |
(a/transduce | |
(comp | |
;; BEWARE: We must drain *both* the channels | |
;; otherwise the other one stays open and we get stuck | |
(catch-ex-as-data (fn [_] (run! drain [ch data-ch]))) | |
(map #(do (swap! data-cnt inc) %)) | |
xf) | |
f | |
init)) | |
[val src] (a/alts!! [result-ch errors-ch]) | |
result (if (= src result-ch) val (a/<!! result-ch)) | |
errs (if (= src errors-ch) val (a/<!! errors-ch))] | |
(cond | |
(seq errs) (throw (ex-info (format "There were %d errors (%d ok) in the input; first error: %s" | |
(count errs) @data-cnt (first errs)) | |
{:errs errs} | |
(first errs))) | |
(throwable? result) (throw (ex-info (str "Data transformation failed:" result) {} result)) | |
:else result))) | |
(is ; This failed before we started draining both ch and data-ch | |
(thrown-with-msg? | |
RuntimeException | |
#"Data transformation failed" | |
(catching-transduce | |
(map #(when (= :throw %) | |
(throw (RuntimeException. "Poisonous throw!")))) | |
conj | |
[] | |
;; :ok? :throw :ok :throw, :throw :ok :ok, :throw :throw :throw | |
(a/to-chan [:throw | |
(rand-nth [:ok :throw]) | |
(rand-nth [:ok :throw])]))) | |
"Error during transduce followed by any 2+ things") | |
(is (= [1 3] | |
(catching-transduce | |
(map identity) | |
conj | |
[] | |
(a/to-chan [1 #_(RuntimeException. "FAKE") 3]))) | |
"Successful data processing") | |
(is (thrown-with-msg? | |
RuntimeException | |
#"FAKE" | |
(catching-transduce | |
(map identity) | |
conj | |
[] | |
(a/to-chan [1 (RuntimeException. "FAKE") 3]))) | |
"Error in the input channel") | |
(is (thrown-with-msg? | |
RuntimeException | |
#"Data transformation failed:java.lang.RuntimeException: Poisonous two!" | |
(catching-transduce | |
(map #(if (= 2 %) | |
(throw (RuntimeException. "Poisonous two!")) | |
%)) | |
conj | |
[] | |
(a/to-chan [1 2 3]))) | |
"Error during transduce [middle]") | |
#_(is (thrown-with-msg? | |
RuntimeException | |
#"Data transformation failed:java.lang.RuntimeException: Poisonous two!" | |
(catching-transduce | |
(map (fn [_] (throw (RuntimeException. "Poisonous two!")))) | |
conj | |
[] | |
(a/to-chan [1 2]))) | |
"Error during transduce")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment