Skip to content

Instantly share code, notes, and snippets.

@nyx
Created April 6, 2014 23:20
Show Gist options
  • Save nyx/10012474 to your computer and use it in GitHub Desktop.
Save nyx/10012474 to your computer and use it in GitHub Desktop.
A simple Storm topology defined in Clojure that doubles an infinite streams of random values with some parallelism
(ns dawn.topology
(:import [backtype.storm StormSubmitter LocalCluster])
(:use [backtype.storm clojure config])
(:gen-class))
(defspout random-spout ["random-float"]
[conf context collector]
(spout
(ack [id]
(println "random-spout ack"))
(activate []
(println "random-spout activate"))
(close []
(println "random-spout close"))
(deactivate []
(println "random-spout deactivate"))
(fail [id]
(println "random-spout fail" id))
(nextTuple []
(Thread/sleep 100)
(let [v (Math/random)]
(emit-spout! collector [v])))
(open [conf context collector]
(println "random-spout open" conf context collector))))
(defbolt doubler-bolt ["number"] [tuple collector]
(let [v (.getDouble tuple 0)
v2 (* v 2)]
(println "doubler-bolt" v "->" v2)
(emit-bolt! collector [v2] :anchor tuple)
(ack! collector tuple)))
(defbolt stdout-reporter-bolt [] [tuple collector]
(let [v (.getValue tuple 0)]
(println "stdout-reporter-bolt" v)
(ack! collector tuple)))
(defn mk-topology []
(topology
{"a-random-spout" (spout-spec random-spout :p 2)}
{"doubler-bolt" (bolt-spec {"a-random-spout" :shuffle}
doubler-bolt :p 2)
"stdout-reporter-bolt" (bolt-spec {"doubler-bolt" :shuffle}
stdout-reporter-bolt :p 1)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "sample-topology" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)))
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-DEBUG true
TOPOLOGY-WORKERS 3}
(mk-topology)))
(defn -main
([]
(run-local!))
([name]
(submit-topology! name)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment