Created
March 15, 2015 17:23
-
-
Save ghadishayban/30c2327665901ae04f03 to your computer and use it in GitHub Desktop.
Interruptible Channels (Needs JDK8)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns poolboy.chanfut | |
(:import [java.util.concurrent CompletableFuture Callable Future] | |
[java.util.function BiConsumer] | |
[java.util.concurrent Executor ExecutorService]) | |
(:require [clojure.core.async :as async] | |
[clojure.core.async.impl.protocols :as async-impl])) | |
(def async-executor clojure.core.async.impl.exec.threadpool/the-executor) | |
(defprotocol Interruptible | |
(interrupt* [_])) | |
(deftype InterruptibleChannel [^Future task ch] | |
async-impl/ReadPort | |
(take! [_ fn1] (async-impl/take! ch fn1)) | |
async-impl/Channel | |
(close! [_] (async-impl/close! ch)) | |
(closed? [_] (async-impl/closed? ch)) | |
Interruptible | |
(interrupt* [_] | |
;; closing the channel here will prevent the completeable future | |
(.cancel task true))) | |
(defn supply | |
[executor f ch] | |
(let [cf (CompletableFuture.) | |
conveyor (reify | |
BiConsumer | |
(accept [_ o ex] | |
(if ex | |
(async/put! ch ex) | |
(when (some? o) (async/put! ch o))) | |
(async/close! ch))) | |
binds (clojure.lang.Var/getThreadBindingFrame) | |
task (.submit ^ExecutorService executor | |
^Callable (fn [] | |
(clojure.lang.Var/resetThreadBindingFrame binds) | |
(try | |
(.complete cf (f)) | |
(catch Throwable t | |
(.completeExceptionally cf t)))))] | |
(.whenCompleteAsync cf ^BiConsumer conveyor ^Executor async-executor) | |
(InterruptibleChannel. task ch))) | |
(defn as-channel* | |
[executor f] | |
(let [ch (async/chan 1)] | |
(supply executor f ch))) | |
(defmacro as-channel | |
"Similar to thread but runs in your own executor. | |
The channel returned will yield the block's return value, or | |
thrown exception." | |
[executor & body] | |
`(as-channel* ~executor (^{:once true} fn* [] ~@body))) | |
(defn interrupt | |
"Issues Thread.interrupt() to the task that supplies a channel." | |
[ch] | |
(interrupt* ch)) | |
(comment | |
(def my-executor clojure.lang.Agent/soloExecutor) | |
(def retc (as-channel my-executor (Thread/sleep 15000))) | |
;; wait a few | |
(interrupt retc) | |
;; should yield an InterruptedException | |
(async/<!! retc)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment