Created
April 21, 2010 07:13
-
-
Save kei-s/373523 to your computer and use it in GitHub Desktop.
user streams to AMQP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mongodb/* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim:fileencoding=utf-8 | |
require 'time' | |
require 'rubygems' | |
require 'mq' | |
require 'yajl' | |
require 'pp' | |
AMQP.start do | |
amq = MQ.new | |
queue = amq.queue('tweetfan').bind(amq.fanout('tweet')) | |
Signal.trap(:INT) do | |
queue.delete | |
AMQP.stop {EM.stop} | |
end | |
Signal.trap(:TERM) do | |
queue.delete | |
AMQP.stop {EM.stop} | |
end | |
queue.subscribe do |tweet| | |
data = Yajl::Parser.parse(tweet) | |
begin | |
if data['text'] | |
if data['retweeted_status'] | |
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} retweet >#{data['user']['screen_name']}< | <#{data['retweeted_status']['user']['screen_name']}> #{data['text']}" | |
else | |
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} <#{data['user']['screen_name']}> #{data['text']}" | |
end | |
elsif data['event'] | |
case data['event'] | |
when 'favorite' | |
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} favorite >#{data['source']['screen_name']}< | <#{data['target']['screen_name']}> #{data['target_object']['text']}" | |
when 'unfavorite' | |
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} unfavorite >#{data['source']['screen_name']}< | <#{data['target']['screen_name']}> #{data['target_object']['text']}" | |
when 'follow' | |
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} follow <#{data['source']['screen_name']}> => <#{data['target']['screen_name']}>" | |
else | |
pp data | |
end | |
else | |
pp data | |
end | |
rescue => e | |
puts e.message | |
puts e.backtrace | |
exit | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim:fileencoding=utf-8 | |
require 'uri' | |
require 'time' | |
require 'rubygems' | |
require 'mq' | |
require 'yajl' | |
require 'MeCab' | |
@mecab = MeCab::Tagger.new | |
tw_count = 0 | |
bucket = {} | |
AMQP.start do | |
amq = MQ.new | |
queue = amq.queue('mecabtweet').bind(amq.fanout('tweet')) | |
Signal.trap(:INT) do | |
queue.delete | |
AMQP.stop {EM.stop} | |
end | |
Signal.trap(:TERM) do | |
queue.delete | |
AMQP.stop {EM.stop} | |
end | |
queue.subscribe do |tweet| | |
data = Yajl::Parser.parse(tweet) | |
begin | |
if data['text'] | |
words = @mecab.parse(data['text'].gsub(URI.regexp(['http','https']),'')). | |
each_line.map{|l|l.chomp}.reject{|l|l=='EOS'}.map{|l|l.split("\t")[0]} | |
words.each do |word| | |
if bucket.has_key? word | |
bucket[word] += 1 | |
else | |
bucket[word] = 1 | |
end | |
end | |
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} <#{data['user']['screen_name']}> #{data['text']}" | |
puts bucket.sort_by{|k,v| -v}.take(10).map{|k,v|"#{v} #{k}"} | |
puts "tw: #{tw_count+=1} words: #{bucket.size}" | |
end | |
rescue => e | |
puts e.message | |
puts e.backtrace | |
exit | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim:fileencoding=utf-8 | |
require 'rubygems' | |
require 'mq' | |
require 'yajl' | |
# checkout http://gist.github.com/132372 | |
require 'saykana' | |
AMQP.start(:host=>'libelabo.jp') do | |
amq = MQ.new | |
queue = amq.queue('saytweet',:auto_delete=>true).bind(amq.fanout('tweet')) | |
Signal.trap(:INT) do | |
queue.delete | |
AMQP.stop {EM.stop} | |
end | |
Signal.trap(:TERM) do | |
queue.delete | |
AMQP.stop {EM.stop} | |
end | |
queue.subscribe do |tweet| | |
data = Yajl::Parser.parse(tweet) | |
if data['text'] && !data['user']['protected'] | |
puts "<#{data['user']['screen_name']}> #{data['text']}" | |
sayKana(data['text'].gsub('@',' ')) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim:fileencoding=utf-8 | |
require 'time' | |
require 'rubygems' | |
require 'mq' | |
require 'yajl' | |
require 'pp' | |
def to_time(str) | |
Time.parse(str).strftime("%m/%d %H:%M:%S") | |
end | |
AMQP.start do | |
amq = MQ.new | |
publicqueue = amq.fanout('tweetpublic') | |
queue = amq.queue('tweetfilter').bind(amq.fanout('tweet')) | |
queue.subscribe do |tweet| | |
data = Yajl::Parser.parse(tweet) | |
begin | |
if data['text'] | |
if !data['user']['protected'] | |
publicqueue.publish(tweet) | |
end | |
elsif data['event'] | |
case data['event'] | |
when 'favorite' | |
if !data['source']['protected'] && !data['target_object']['user']['protected'] | |
publicqueue.publish(tweet) | |
end | |
when 'unfavorite' | |
if !data['source']['protected'] && !data['target_object']['user']['protected'] | |
publicqueue.publish(tweet) | |
end | |
when 'retweet' | |
if !data['source']['protected'] && !data['target_object']['user']['protected'] | |
publicqueue.publish(tweet) | |
end | |
when 'follow' | |
if !data['source']['protected'] && !data['target']['protected'] | |
publicqueue.publish(tweet) | |
end | |
end | |
else | |
pp data | |
end | |
rescue => e | |
puts e.message | |
puts e.backtrace | |
exit | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim:fileencoding=utf-8 | |
require 'uri' | |
require 'rubygems' | |
require 'yajl/http_stream' | |
require 'pit' | |
require 'mongo' | |
require 'bunny' | |
db = Mongo::Connection.new.db('tweetstream') | |
collection = db.collection('kei') | |
config = Pit.get("tweetstream",:require => { | |
"login" => "login", | |
"password" => "password" | |
}) | |
uri = URI.parse("http://#{config['login']}:#{config['password']}@chirpstream.twitter.com/2b/user.json") | |
b = Bunny.new | |
b.start | |
exch = b.exchange('tweet', :type => :fanout) | |
Yajl::HttpStream.get(uri) do |data| | |
puts data.inspect | |
exch.publish(Yajl::Encoder.encode(data).force_encoding('us-ascii')) | |
collection.insert(data) | |
end | |
b.stop |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim:fileencoding=utf-8 | |
require 'rubygems' | |
require 'em-websocket' | |
require 'mq' | |
EventMachine.run do | |
@channel = EM::Channel.new | |
twitter = MQ.new | |
twitter.queue('websocket').bind(twitter.fanout('tweetpublic')).subscribe do |t| | |
@channel.push t | |
end | |
EM::WebSocket.start(:host=>'0.0.0.0', :port=>8080) do |ws| | |
ws.onopen do | |
sid = @channel.subscribe {|msg| ws.send msg} | |
puts "#{sid} connected" | |
ws.onmessage {|msg| | |
puts "<#{sid}>: #{msg}" | |
} | |
ws.onclose { | |
@channel.unsubscribe(sid) | |
puts "#{sid} closed" | |
} | |
end | |
end | |
puts "Server started" | |
end |
Thank you for your advise!
I heard "Capped Collection" for the first time.
I want to archive all tweets in this project.
So, I think that auto-LRU age-out feature of capped collection is not suitable for what I want to do.
You may think that MongoDB isn't good answer for my wants.
But I wanted to learn MongoDB anyway ;)
I see. In the usual collections which is not capped collections, it is not guaranteed that it can take out in order of insertion. You might have to set the index to created_at attribute.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Capped Collection!