The command below will run forever and keep printing number of tweets with/without geo location data.
% lein run username password
The following command will generate docments under docs/
% lein marg
Copyright © 2012 Aibiki
| /docs | |
| /target | |
| /lib | |
| /classes | |
| /checkouts | |
| pom.xml | |
| *.jar | |
| *.class | |
| .lein-deps-sum | |
| .lein-failures | |
| .lein-plugins | |
| .lein-repl-history |
The command below will run forever and keep printing number of tweets with/without geo location data.
% lein run username password
The following command will generate docments under docs/
% lein marg
Copyright © 2012 Aibiki
| (ns aibiki.core | |
| (:import [backtype.storm LocalCluster]) | |
| (:use [backtype.storm clojure config] | |
| [aibiki.spouts twitter] | |
| [aibiki.bolts geo-location])) | |
| (defn -main | |
| [username password] | |
| (let [topology (topology | |
| {"twitter-spout" (spout-spec (twitter username password) :p 1)} | |
| {"japanese?-bolt" (bolt-spec {"twitter-spout" :shuffle} (lang-filter "ja") :p 1) | |
| "count-geo-location-bolt" (bolt-spec {"japanese?-bolt" :shuffle} count-geo-location :p 1)}) | |
| cluster (LocalCluster.)] | |
| (.submitTopology cluster "twitter" {TOPOLOGY-DEBUG false} topology))) |
| (ns aibiki.core-test | |
| (:use [clojure test] | |
| [aibiki core])) |
| (ns aibiki.bolts.geo-location | |
| (:use [backtype.storm clojure config])) | |
| (defbolt lang-filter | |
| ["lang-filter"] | |
| {:params [lang] :prepare true} | |
| [conf context collector] | |
| (bolt | |
| (execute [tuple] | |
| (let [status (.getValue tuple 0)] | |
| (if (= lang (.. status (getUser) (getLang))) | |
| (emit-bolt! collector [status] :anchor tuple)))))) | |
| (defbolt has-geo-location? | |
| ["has-geo-location?"] | |
| [tuple collector] | |
| (let [status (.getValue tuple 0)] | |
| (if (.getGeoLocation status) | |
| (emit-bolt! collector [status] :anchor tuple)))) | |
| (defbolt count-geo-location | |
| ["count-geo-location"] | |
| {:prepare true} | |
| [conf context collector] | |
| (let [counts (atom {})] | |
| (bolt | |
| (cleanup [] (println counts)) | |
| (execute [tuple] | |
| (let [status (.getValue tuple 0)] | |
| (if (.getGeoLocation status) | |
| (swap! counts (partial merge-with +) {:yes 1}) | |
| (swap! counts (partial merge-with +) {:no 1})) | |
| (println counts)))))) | |
| (defbolt printer | |
| ["printer"] | |
| [tuple collector] | |
| (println tuple)) |
| (ns aibiki.analizers.ja | |
| (:import [org.atilika.kuromoji Tokenizer]) | |
| (:use [clojure.string :only [split]])) | |
| (def ^{:private true} | |
| place-name | |
| "地域") | |
| (defn tokenize | |
| [s] | |
| (let [tokenizer (.build (Tokenizer/builder))] | |
| (.tokenize tokenizer s))) | |
| (defn place-name? | |
| [t] | |
| (true? (some #(= place-name %) (.split (.getPartOfSpeech t) ",")))) |
| (ns aibiki.analizers.ja-test | |
| (:use [clojure test] | |
| [aibiki.analizers ja])) | |
| (deftest tokenize-test | |
| (testing "tokenizer" | |
| (is (= 0 (count (seq (tokenize ""))))) | |
| (is (= 1 (count (seq (tokenize "九段下"))))) | |
| (is (= 3 (count (seq (tokenize "九段下生まれです"))))))) | |
| (deftest place-name?-test | |
| (testing "place-name?" | |
| (let [tokens (seq (tokenize "九段下生まれです")) | |
| kudanshita (nth tokens 0) | |
| umare (nth tokens 1) | |
| desu (nth tokens 2)] | |
| (is (= true (place-name? kudanshita))) | |
| (is (= false (place-name? umare))) | |
| (is (= false (place-name? desu)))))) |
| log4j.rootLogger=ERROR, A1 | |
| log4j.appender.A1=org.apache.log4j.ConsoleAppender | |
| log4j.appender.A1.layout=org.apache.log4j.PatternLayout | |
| log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n | |
| log4j.category.org.apache.zookeeper=WARN |
| (defproject aibiki "0.1.0-SNAPSHOT" | |
| :description "Aibiki Storm: Read twitter/facebook streams." | |
| :url "http://www.aibiki.jp/" | |
| :license {:name "Eclipse Public License" | |
| :url "http://www.eclipse.org/legal/epl-v10.html"} | |
| :repositories [["kuromoji" "http://www.atilika.org/nexus/content/repositories/atilika"] | |
| ["foursquareapijava" "http://foursquare-api-java.googlecode.com/svn/repository"]] | |
| :plugins [[lein-marginalia "0.7.1"]] | |
| :dependencies [[org.clojure/clojure "1.4.0"] | |
| [storm "0.8.1"] | |
| [org.twitter4j/twitter4j-core "3.0.1"] | |
| [org.twitter4j/twitter4j-stream "3.0.1"] | |
| [fi.foyt/foursquare-api "1.0.2"] | |
| [org.atilika.kuromoji/kuromoji "0.7.7"]] | |
| :omit-source true | |
| :main aibiki.core) |
| (ns aibiki.spouts.twitter | |
| (:import [java.util.concurrent ConcurrentLinkedQueue] | |
| [twitter4j TwitterStreamFactory StatusListener] | |
| [twitter4j.conf ConfigurationBuilder]) | |
| (:use [backtype.storm clojure config])) | |
| (defn- twitter-stream-listener | |
| [queue] | |
| (proxy [StatusListener] [] | |
| (onStatus [status] (.offer queue status)) | |
| (onDeletionNotice [notice]) | |
| (onTrackLimitationNotice [notice]) | |
| (onScrubGeo [user-id up-to-status-id]) | |
| (onStallWarning [warning]))) | |
| (defn- twitter-stream | |
| [username password queue] | |
| (let [config (.. (ConfigurationBuilder.) (setUser username) (setPassword password) (build)) | |
| twitter-stream (.getInstance (TwitterStreamFactory. config))] | |
| (.addListener twitter-stream (twitter-stream-listener queue)) | |
| (.sample twitter-stream) | |
| twitter-stream)) | |
| (defspout twitter | |
| ["twitter"] | |
| {:params [username password]} | |
| [conf context collector] | |
| (let [queue (ConcurrentLinkedQueue.) | |
| stream (twitter-stream username password queue)] | |
| (spout | |
| (close [] | |
| (.cleanUp stream)) | |
| (nextTuple [] | |
| (when-let [status (.poll queue)] | |
| (emit-spout! collector [status])))))) |