Created
December 20, 2011 21:54
-
-
Save wilkes/1503466 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns storm.starter.clj.exclamation | |
(:import [backtype.storm StormSubmitter LocalCluster]) | |
(:use [backtype.storm clojure config]) | |
(:gen-class)) | |
(defspout test-word-spout ["word"] {:prepare false} | |
[collector] | |
(Thread/sleep 100) | |
(emit-spout! collector [(rand-nth ["first" "second" "third" "fourth"])])) | |
(defbolt exclaim ["word"] [tuple collector] | |
(let [w (str (.getString tuple 0) "!!!")] | |
(emit-bolt! collector [w] :anchor tuple)) | |
(ack! collector tuple)) | |
(defn make-topology [] | |
(topology | |
{"word" (spout-spec test-word-spout :p 10)} | |
{"exclaim1" (bolt-spec {"word" :shuffle} | |
exclaim | |
:p 3) | |
"exclaim2" (bolt-spec {"exclaim1" :shuffle} | |
exclaim | |
:p 2)})) | |
(defn run-local! [] | |
(let [cluster (LocalCluster.)] | |
(.submitTopology cluster "exclaim" {TOPOLOGY-DEBUG true} (make-topology)) | |
(Thread/sleep 10000) | |
(.shutdown cluster))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment