Created
January 20, 2013 12:57
-
-
Save evieluvsrainbows/4578464 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns backtype.storm.nimbus.storage | |
(:import [java.io InputStream OutputStream FileInputStream FileOutputStream File]) | |
(:import [java.util List Map]) | |
(:import [org.apache.commons.io FileUtils IOUtils]) | |
(:use [backtype.storm log config util]) | |
(:import [backtype.storm.utils Utils]) | |
(:import [backtype.storm.nimbus INimbusStorage])) | |
(defn create-local-storage [conf] | |
(let [stormroot (nimbus-storage-local-dir conf)] | |
(log-message "Using default storage (" stormroot ")") | |
(reify INimbusStorage | |
(^InputStream open [this, ^String path] | |
(FileInputStream. (join-paths stormroot path))) | |
(^OutputStream create [this, ^String path] | |
(FileOutputStream. (join-paths stormroot path))) | |
(^List list [this, ^String path] | |
(seq (.list (File. (join-paths stormroot path))))) | |
(^void delete [this, ^String path] | |
(let [full-path (join-paths stormroot path)] | |
(when (exists-file? full-path) | |
(FileUtils/forceDelete (File. full-path))))) | |
(^void mkdirs [this, ^String path] | |
(FileUtils/forceMkdir (File. (join-paths stormroot path)))) | |
(^boolean isSupportDistributed [this] | |
false)))) | |
(defn create-custom-storage [storage-name conf] | |
(let [storage (new-instance storage-name)] | |
(.init storage conf) | |
(log-message "Using custom storage: " storage-name) | |
storage)) | |
(defn ^INimbusStorage create-nimbus-storage [conf] | |
(if-let [storage-name (conf NIMBUS-STORAGE)] | |
(create-custom-storage storage-name conf) | |
(create-local-storage conf))) | |
(defn list-full-paths [storage path] | |
(map #(str path "/" ^String %) (.list storage path))) | |
(defn upload-file-to-storage [file storage path] | |
(let [stream (.create storage path)] | |
(try | |
(IOUtils/copy (FileInputStream. file) stream) | |
(finally (.close stream))))) | |
(defn ensure-clean-dir-in-storage [storage path] | |
(.mkdirs storage path) | |
(if-let [files (seq (list-full-paths storage path))] | |
(.delete storage files))) | |
(defn serialize-to-storage [obj storage path] | |
(let [stream (.create storage path)] | |
(try | |
(IOUtils/write (Utils/serialize obj) stream) | |
(finally (.close stream))))) | |
(defn deserialize-from-storage [storage path] | |
(let [stream (.open storage path)] | |
(try | |
(Utils/deserialize (IOUtils/toByteArray stream)) | |
(finally (.close stream))))) | |
######################################################## | |
(ns backtype.storm.nimbus.elections | |
(:import [backtype.storm.nimbus NimbusLeaderElections]) | |
(:use [backtype.storm config util log])) | |
(defn local-hostname-conf [conf] | |
(if (contains? conf STORM-LOCAL-HOSTNAME) | |
(conf STORM-LOCAL-HOSTNAME) | |
(local-hostname))) | |
(defn get-nimbus-leader-addr [conf] | |
(let [leader-elections (NimbusLeaderElections.)] | |
(.init leader-elections conf nil) | |
(let [leader-addr (.getLeaderAddr leader-elections)] | |
(.close leader-elections) | |
leader-addr))) | |
(defn get-nimbus-addr-list [conf] | |
(let [leader-elections (NimbusLeaderElections.)] | |
(.init leader-elections conf nil) | |
(let [addr-list (.getNimbusHosts leader-elections)] | |
(.close leader-elections) | |
addr-list))) | |
(defn await-leadership [conf storage] | |
(let [leader-elections (NimbusLeaderElections.)] | |
(when-let [leader-addr (get-nimbus-leader-addr conf)] | |
(log-message "Current Nimbus leader: " leader-addr) | |
(if-not (.isSupportDistributed storage) | |
(throw (IllegalStateException. "Trying to start secondary Nimbus with storage that isn't support distributed")))) | |
(.init leader-elections conf (str (local-hostname-conf conf) ":" (conf NIMBUS-THRIFT-PORT))) | |
(log-message "Nimbus awaiting for leadership") | |
(.awaitLeadership leader-elections) | |
(log-message "Nimbus gained leadership") | |
leader-elections)) | |
(defn ensure-leadership [leader-elections] | |
(when-not (.hasLeadership leader-elections) (throw (backtype.storm.generated.NotALeaderException.)))) | |
######################################## | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment