Created
January 3, 2014 03:23
-
-
Save bbbates/8232097 to your computer and use it in GitHub Desktop.
Allowing for multiple forms to wait until leader election
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns leader-guarantee.core | |
(:import (org.jgroups.blocks.locking LockService) | |
(org.jgroups.protocols CENTRAL_LOCK) | |
(org.jgroups JChannel) | |
(java.util.concurrent.locks Lock))) | |
(def get-lock-threads (atom {})) | |
(defn- get-lock | |
[lock-service lock-promise] | |
(fn [] | |
(let [lock (.getLock lock-service "leader")] | |
(do | |
(.lock lock) ;; << Lock is still acquired here | |
(deliver lock-promise :is-leader))))) | |
(defn- leader-thread | |
[cluster-name] | |
(locking get-lock-threads | |
(if-not (get @get-lock-threads cluster-name) | |
(let [channel (doto (JChannel.) ;; Uses udp by default | |
(.. getProtocolStack (addProtocol (doto (CENTRAL_LOCK.) (.init)))) | |
(.connect cluster-name)) | |
lock-service (LockService. channel) | |
lock-promise (promise)] | |
(get (swap! get-lock-threads | |
assoc | |
cluster-name | |
{:thread | |
(doto (Thread. (get-lock lock-service lock-promise) "LeaderAppointmentThread") | |
(.setDaemon true) | |
(.start)) | |
:promise lock-promise}) | |
cluster-name)) | |
(get @get-lock-threads cluster-name)))) | |
(defn when-leader* | |
"Executes f when the local application container becomes leader of the cluster-name cluster." | |
[cluster-name f] | |
(let [leader-promise (:promise (leader-thread cluster-name))] | |
(if (realized? leader-promise) ;; << Has the promise been delivered yet? | |
(f) | |
(future | |
(when (deref leader-promise) (f)))))) ;; << this when form will block until the promise has been delivered by the lock thread |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment