Last active
May 12, 2018 02:55
-
-
Save spieden/482a6c3a8bd3b0f82c89 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns shell | |
(:require [me.raynes.conch.low-level :as sh] | |
[slingshot.slingshot :refer [throw+ try+]] | |
[clojure.core.async :refer [go alts!!]] | |
[clojure.string :as str]) | |
(:import (java.io IOException StringWriter) | |
(java.util.concurrent ExecutionException))) | |
(defn start-step! [upstream step] | |
(let [prev-step (-> upstream last) | |
proc (apply sh/proc step) | |
err-stream (future (sh/stream-to-string proc :err)) | |
in-stream (when prev-step | |
(future (sh/feed-from proc (:out prev-step)) | |
(sh/done proc)))] | |
(conj upstream (merge proc {:err-stream err-stream | |
:in-stream in-stream | |
:cl (str/join " " step)})))) | |
(defn start-pipeline! [steps in out] | |
(let [pipeline (reduce start-step! [] steps) | |
[top bottom] ((juxt first last) pipeline) | |
out-stream (future (sh/stream-to bottom :out out)) | |
in-stream (future (sh/feed-from top in) | |
(sh/done top))] | |
{:step-procs (into [(assoc top :in-stream in-stream)] | |
(rest pipeline)) | |
:out-stream out-stream})) | |
(defn exec-io-exception? [e] | |
(and (instance? ExecutionException e) | |
(instance? IOException (.getCause e)))) | |
(defn wait-proc [proc] | |
(let [in-result (try+ (deref (:in-stream proc)) | |
(catch exec-io-exception? e e)) | |
exit-code (sh/exit-code proc)] | |
(if (or (exec-io-exception? in-result) | |
(not (= 0 exit-code))) | |
{:cl (:cl proc) | |
:err @(:err-stream proc) | |
:exit-code exit-code} | |
:ok))) | |
(defn wait-pipeline! [{:keys [step-procs out-stream]}] | |
(let [proc-chans (mapv #(go (wait-proc %)) step-procs)] | |
(loop [pending (set proc-chans) | |
[result chan] (alts!! proc-chans)] | |
(cond (= :ok result) (let [new-pending (disj pending chan)] | |
(if-not (empty? new-pending) | |
(recur new-pending (alts!! proc-chans)))) | |
(map? result) (throw+ result) | |
(instance? Throwable result) (throw result)))) | |
(deref out-stream) | |
:ok) | |
(defn run-pipeline! | |
([steps in] | |
(let [out (StringWriter.)] | |
(run-pipeline! steps in out) | |
(str out))) | |
([steps in out] | |
(wait-pipeline! (start-pipeline! steps in out)))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment