Created
April 6, 2014 23:20
-
-
Save nyx/10012474 to your computer and use it in GitHub Desktop.
A simple Storm topology defined in Clojure that doubles an infinite streams of random values with some parallelism
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns dawn.topology | |
(:import [backtype.storm StormSubmitter LocalCluster]) | |
(:use [backtype.storm clojure config]) | |
(:gen-class)) | |
(defspout random-spout ["random-float"] | |
[conf context collector] | |
(spout | |
(ack [id] | |
(println "random-spout ack")) | |
(activate [] | |
(println "random-spout activate")) | |
(close [] | |
(println "random-spout close")) | |
(deactivate [] | |
(println "random-spout deactivate")) | |
(fail [id] | |
(println "random-spout fail" id)) | |
(nextTuple [] | |
(Thread/sleep 100) | |
(let [v (Math/random)] | |
(emit-spout! collector [v]))) | |
(open [conf context collector] | |
(println "random-spout open" conf context collector)))) | |
(defbolt doubler-bolt ["number"] [tuple collector] | |
(let [v (.getDouble tuple 0) | |
v2 (* v 2)] | |
(println "doubler-bolt" v "->" v2) | |
(emit-bolt! collector [v2] :anchor tuple) | |
(ack! collector tuple))) | |
(defbolt stdout-reporter-bolt [] [tuple collector] | |
(let [v (.getValue tuple 0)] | |
(println "stdout-reporter-bolt" v) | |
(ack! collector tuple))) | |
(defn mk-topology [] | |
(topology | |
{"a-random-spout" (spout-spec random-spout :p 2)} | |
{"doubler-bolt" (bolt-spec {"a-random-spout" :shuffle} | |
doubler-bolt :p 2) | |
"stdout-reporter-bolt" (bolt-spec {"doubler-bolt" :shuffle} | |
stdout-reporter-bolt :p 1)})) | |
(defn run-local! [] | |
(let [cluster (LocalCluster.)] | |
(.submitTopology cluster "sample-topology" {TOPOLOGY-DEBUG true} (mk-topology)) | |
(Thread/sleep 10000) | |
(.shutdown cluster))) | |
(defn submit-topology! [name] | |
(StormSubmitter/submitTopology | |
name | |
{TOPOLOGY-DEBUG true | |
TOPOLOGY-WORKERS 3} | |
(mk-topology))) | |
(defn -main | |
([] | |
(run-local!)) | |
([name] | |
(submit-topology! name))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment