Created
January 13, 2012 19:45
-
-
Save anonymous/1608336 to your computer and use it in GitHub Desktop.
Reliable word-count topology
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns topologies.core | |
(:import [backtype.storm StormSubmitter LocalCluster]) | |
(:require [clojure.test :as test]) | |
(:use [backtype.storm clojure config]) | |
(:gen-class)) | |
(def id-count (atom 0)) ;; tuple counter for debugging -- something to make ids out of | |
(defspout sentence-spout ["sentence"] | |
[conf context collector] | |
(let [sentences ["a little brown dog" | |
"the man petted the dog" | |
"four score and seven years ago" | |
"an apple a day keeps the doctor away"]] | |
(spout | |
(nextTuple [] | |
(Thread/sleep 10) | |
(let [sentence (rand-nth sentences) | |
id (swap! id-count inc)] | |
;;(spit "/tmp/sentence-spout" (str "sentence-spout emitting tuple " id ": " sentence "\n") :append true) | |
(emit-spout! collector [sentence] :id id) | |
)) | |
(ack [id] | |
(spit "/tmp/sentence-spout-ack" (str "acked: " id "\n") :append true)) | |
(fail [id] | |
(spit "/tmp/sentence-spout-fail" (str "failed: " id "\n") :append true))))) | |
(defbolt split-sentence ["word"] [tuple collector] | |
(let [sentence (.getString tuple 0) | |
words (.split sentence " ")] | |
;(spit "/tmp/split-sentence" (str "split-sentence called with: " sentence "\n") :append true) | |
(doseq [w words] | |
(emit-bolt! collector [w] :anchor tuple)) | |
(ack! collector tuple) | |
)) | |
(defbolt word-count ["word" "count"] {:prepare true} | |
[conf context collector] | |
(let [counts (atom {})] | |
(bolt | |
(execute [tuple] | |
(let [word (.getString tuple 0)] | |
;(spit "/tmp/word-count" (str "word-count called with word: " word "\n") :append true) | |
(swap! counts (partial merge-with +) {word 1}) | |
;(emit-bolt! collector [word (@counts word)] :anchor tuple) | |
(ack! collector tuple) | |
))))) | |
(defn mk-topology [] | |
(topology | |
{"1" (spout-spec sentence-spout)} | |
{"3" (bolt-spec {"1" :shuffle} | |
split-sentence | |
:p 5) | |
"4" (bolt-spec {"3" ["word"]} | |
word-count | |
:p 6)})) | |
(defn run-local! [] | |
(let [cluster (LocalCluster.)] | |
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology)) | |
(Thread/sleep 10000) | |
(.shutdown cluster) | |
)) | |
(defn -main [name] | |
(StormSubmitter/submitTopology | |
name | |
{TOPOLOGY-DEBUG false | |
TOPOLOGY-WORKERS 3} | |
(mk-topology))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment