Skip to content

Instantly share code, notes, and snippets.

@cgrand
Last active October 25, 2015 18:32
Show Gist options
  • Save cgrand/767673242b7f7c27f35a to your computer and use it in GitHub Desktop.
Save cgrand/767673242b7f7c27f35a to your computer and use it in GitHub Desktop.
(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(def ^:private abandon (doto (async/chan) async/close!))
(def ^:private pending-writes (async/chan))
(defn put!
"Tries to put a val into chan, returns either: true if put succeeded, false if chan is
closed, ::pressurized if the put hasn't been performed yet. No further write should
be attempted before reactivate! is called.
When reactivate! is called either x has been put succesfully or chan is closed.
reactivate! takes 3 arguments: v, chan and x. chan and x are the same as passed to put!
v is the return value as by >!."
[chan x reactivate!]
(let [[v c] (async/alts!! [[chan x] abandon] :priority true)]
(if (identical? abandon c)
(do
(async/>!! pending-writes [chan x reactivate!])
::pressurized)
(boolean v))))
(def ^:private watcher
;; most of this code is dedicated to incrementally updating altsv (rather than recreating it from writes-map each time)
(async/go-loop [writes-map {} altsv [pending-writes]]
(let [[v c] (async/alts! altsv)]
(if (identical? c pending-writes)
(let [[c x reactivate!] v]
(if-let [[n f b] (writes-map c)]
(recur (assoc writes-map c [n f (conj b [x reactivate!])]) altsv)
(recur (assoc writes-map c [(count altsv) (list [x reactivate!]) []])
(conj altsv [c x]))))
(let [[n [[x reactivate!] & f] b] (writes-map c)
f' (if f f (seq b))
b' (if f b [])]
(reactivate! v c x)
(if-let [[[x]] f']
(recur (assoc writes-map c [n f' b']) (assoc altsv n [c x]))
(let [[lastc] (peek altsv)
[lastn f b] (writes-map lastc)
writes-map (assoc writes-map lastc [n f b])
altsv (assoc altsv n lastc)]
(recur (dissoc writes-map c) (pop altsv)))))))))
@cgrand
Copy link
Author

cgrand commented Oct 10, 2014

for NIO, it means cancelling the SelectionKey upon ::pressurized and, in reactivate!, registering the SelectableChannel again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment