Last active
November 22, 2017 11:19
-
-
Save niwinz/083f5eb6ee52001ec5d30a94e2e3dc41 to your computer and use it in GitHub Desktop.
Recycle Iteration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; BOOT_CLOJURE_NAME=org.clojure/clojure | |
;; BOOT_CLOJURE_VERSION=1.9.0-RC1 | |
;; BOOT_VERSION=2.7.2 | |
;; BOOT_JVM_OPTIONS="-Xms4g -Xmx4g -XX:+UseG1GC -XX:+AggressiveOpts -server" | |
(set-env! :dependencies '[[org.clojure/core.async "0.3.443"] | |
[org.clojure/clojure "1.9.0-RC1"]]) | |
(require '[clojure.core.async :as a] | |
'[clojure.core.async.impl.protocols :as ap]) | |
;; --- Helpers | |
(def ^:private noop (constantly nil)) | |
(defn- service? | |
[v] | |
(::service v)) | |
(defn- chan? | |
[v] | |
(satisfies? ap/ReadPort v)) | |
(defmacro try-on | |
[& body] | |
`(try ~@body (catch Throwable e# e#))) | |
;; --- Main API | |
(def ^:dynamic *timeout* | |
"Maximum number of milliseconds to wait for a response from a | |
service. This variable by default is nil, meaning the value used | |
when creating the service is used." | |
nil) | |
(defn- default-receive | |
"A default implementation for `:receive`." | |
[& _] | |
(ex-info "service do not implement :receive" {})) | |
(defn service | |
"Create a service using the spec map as a blueprint. | |
The spec map may contain any of the following keys (all of the are | |
optional): | |
- `:init`: initialization hook, receives the service spec and optionally some | |
config parameter and it should return the internal state instance; | |
this parameter is optional. | |
- `:stop`: termination hook, receives the service spec, internal state and | |
allows proper resource clearing; this parameter is optional. | |
- `:error`: error hook, receives the service spec, internal state and the | |
exception instance; it should handle it or just return the exception | |
instance as is; this parameter is optional. | |
- `:receive`: function that receives the internal state and variable number of args | |
identifying the message sent to the service. | |
- `:timeout`: default maximum number of milliseconds to wait for a response | |
from the service (default 1min). | |
- `:buf-or-n`: core.async buffer or size of buffer to use for the | |
service communication channel (default 1024). | |
" | |
[& {:keys [init stop error receive buf-or-n timeout] | |
:or {init noop | |
stop noop | |
error identity | |
buf-or-n 1024 | |
receive default-receive} | |
:as spec}] | |
(let [inbox-ch (a/chan (if (integer? buf-or-n) buf-or-n (buf-or-n))) | |
stop-ch (a/chan 1) | |
timeout (or timeout *timeout* 60000) | |
status (atom ::stopped) | |
local (volatile! nil)] | |
{::service true | |
::buf-or-n buf-or-n | |
::init init | |
::stop stop | |
::receive receive | |
::inbox-ch inbox-ch | |
::stop-ch stop-ch | |
::timeout timeout | |
::status status | |
::local local})) | |
(declare initialize-loop) | |
(defn started? | |
"Check if service is started. Returns true if it is, false if it's not." | |
[{:keys [::status] :as service}] | |
{:pre [(service? service)]} | |
(= @status ::started)) | |
(defn stopped? | |
"Check if service is stopped. Returns true if it is, false if it's not." | |
[{:keys [::status] :as service}] | |
{:pre [(service? service)]} | |
(= @status ::stopped)) | |
(defn start! | |
([service] | |
(start! service nil)) | |
([{:keys [::status ::local ::init] :as service} config] | |
{:pre [(service? service)]} | |
(when (compare-and-set! status ::stopped ::started) | |
(vreset! local (init config)) | |
(assoc service ::loop-ch (initialize-loop service))))) | |
(defn stop! | |
[{:keys [::status ::local ::stop-ch ::stop] :as service}] | |
{:pre [(service? service)]} | |
(when (compare-and-set! status ::started ::stopped) | |
(a/put! stop-ch true) ;; Notify the internal loop that the service is stoped | |
(stop @local) | |
(vreset! local nil))) | |
(defn with-state | |
"A function that allows return a value attached to new local state." | |
[value state] | |
{::with-state true | |
::local state | |
::value value}) | |
(defn ask! | |
[{:keys [::inbox-ch ::timeout] :as service} & args] | |
(let [output (a/chan 1) | |
timeout (a/timeout timeout) | |
message [output args]] | |
(a/go | |
(let [[val port] (a/alts! [[inbox-ch message] timeout])] | |
(if (identical? port timeout) | |
(ex-info "put message to service timed out" {}) | |
(let [[val port] (a/alts! [output timeout])] | |
(if (identical? port timeout) | |
(ex-info "take message result from service timed out" {}) | |
val))))))) | |
(defn ask!! | |
[service & args] | |
(let [result (a/<!! (apply ask! service args))] | |
(if (instance? Throwable result) | |
(throw result) | |
result))) | |
;; --- Implementation | |
(defn- handle-message | |
[{:keys [::receive ::local]} [out args]] | |
(a/go-loop [result (try-on (apply receive @local args))] | |
(cond | |
(chan? result) | |
(recur (a/<! result)) | |
(and (map? result) | |
(::with-state result)) | |
(do | |
(vreset! local (::local result)) | |
(a/>! out (::value result)) | |
(a/close! out)) | |
:else | |
(do | |
(a/>! out result) | |
(a/close! out))))) | |
(defn- initialize-loop | |
[{:keys [::inbox-ch ::stop-ch] :as service}] | |
(a/go-loop [] | |
(let [[msg port] (a/alts! [stop-ch inbox-ch] :priority true)] | |
(when (identical? port inbox-ch) | |
(a/<! (handle-message service msg)) | |
(recur))))) | |
;; --- Example Code | |
;; An adder service | |
(def service-a | |
(service :init (constantly +) | |
:receive (fn [local & args] | |
;; `local` is `+` function | |
(apply local args)))) | |
;; Asychronous counter service | |
(def service-b | |
(service :init (constantly 0) | |
:receive (fn [counter & args] | |
(a/go | |
(a/<! (a/timeout 100)) ;; simulate some async work | |
(with-state counter (inc counter)))))) | |
;; Hierarchical service | |
(def service-c | |
(service | |
;; Service initialization | |
:init (fn [config] | |
(let [adder (service :receive (fn [_ & args] (apply + args))) | |
hello (service :receive (fn [_ & [name]] (str "Hello " name)))] | |
{:adder (start! adder) | |
:hello (start! hello)})) | |
;; Service resource cleaining | |
:stop (fn [{:keys [adder hello] :as local}] | |
(stop! adder) | |
(stop! hello)) | |
;; Service on message hook | |
:receive (fn [{:keys [adder hello] :as local} & [name & rest]] | |
(a/go | |
(case name | |
:add (a/<! (apply ask! adder rest)) | |
:greets (a/<! (apply ask! hello rest))))))) | |
(start! service-a) | |
(start! service-b) | |
(start! service-c) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment