|
(ns aibiki.core |
|
(:import [java.util.concurrent ConcurrentLinkedQueue] |
|
[twitter4j TwitterStreamFactory StatusListener] |
|
[twitter4j.conf ConfigurationBuilder] |
|
[backtype.storm LocalCluster]) |
|
(:use [backtype.storm clojure config]) |
|
(:gen-class)) |
|
|
|
(defn twitter-stream-listener |
|
[queue] |
|
(proxy [StatusListener] [] |
|
(onStatus [status] (.offer queue status)) |
|
(onDeletionNotice [notice]) |
|
(onTrackLimitationNotice [notice]) |
|
(onScrubGeo [user-id up-to-status-id]) |
|
(onStallWarning [warning]))) |
|
|
|
(defn twitter-stream |
|
[username password queue] |
|
(let [config (.. (ConfigurationBuilder.) (setUser username) (setPassword password) (build)) |
|
twitter-stream (.getInstance (TwitterStreamFactory. config))] |
|
(.addListener twitter-stream (twitter-stream-listener queue)) |
|
(.sample twitter-stream) |
|
twitter-stream)) |
|
|
|
(defspout twitter |
|
["twitter"] |
|
{:params [username password]} |
|
[conf context collector] |
|
(let [queue (ConcurrentLinkedQueue.) |
|
stream (twitter-stream username password queue)] |
|
(spout |
|
(close [] |
|
(.cleanUp stream)) |
|
(nextTuple [] |
|
(when-let [status (.poll queue)] |
|
(emit-spout! collector [status])))))) |
|
|
|
(defbolt lang-filter |
|
["lang-filter"] |
|
{:params [lang] :prepare true} |
|
[conf context collector] |
|
(bolt |
|
(execute [tuple] |
|
(let [status (.getValue tuple 0)] |
|
(if (= lang (.. status (getUser) (getLang))) |
|
(emit-bolt! collector [status] :anchor tuple)))))) |
|
|
|
(defbolt has-geo-location? |
|
["has-geo-location?"] |
|
[tuple collector] |
|
(let [status (.getValue tuple 0)] |
|
(if (.getGeoLocation status) |
|
(emit-bolt! collector [status] :anchor tuple)))) |
|
|
|
(defbolt count-get-location |
|
["count-get-location"] |
|
{:prepare true} |
|
[conf context collector] |
|
(let [counts (atom {})] |
|
(bolt |
|
(cleanup [] (println counts)) |
|
(execute [tuple] |
|
(let [status (.getValue tuple 0)] |
|
(if (.getGeoLocation status) |
|
(swap! counts (partial merge-with +) {:yes 1}) |
|
(swap! counts (partial merge-with +) {:no 1})) |
|
(println counts)))))) |
|
|
|
(defbolt printer |
|
["printer"] |
|
[tuple collector] |
|
(println tuple)) |
|
|
|
(defn -main |
|
[username password] |
|
(let [topology (topology |
|
{"twitter-spout" (spout-spec (twitter username password) :p 1)} |
|
{"japanese?-bolt" (bolt-spec {"twitter-spout" :shuffle} (lang-filter "ja") :p 1) |
|
"count-get-location-bolt" (bolt-spec {"japanese?-bolt" :shuffle} count-get-location :p 1)}) |
|
cluster (LocalCluster.)] |
|
(.submitTopology cluster "twitter" {TOPOLOGY-DEBUG false} topology))) |