Last active
December 22, 2015 21:49
-
-
Save hooptie45/6535740 to your computer and use it in GitHub Desktop.
Demo CSV Generation Storm Topology with Storm, Tire and Redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'red_storm' | |
require 'tire' | |
require 'json' | |
# rvm use jruby@csv-topology --create | |
# bundle install | |
# redstorm bundle topology | |
# redstorm local csv_topology.rb | |
class PersistentArticle | |
include Tire::Model::Persistence | |
property :title | |
property :published_on | |
property :tags | |
end | |
class JobsSpout < RedStorm::SimpleSpout | |
on_send do | |
if @q.size > 0 | |
message = @q.pop | |
message | |
end | |
end | |
on_init do | |
@q = Queue.new | |
@redis_reader = detach_redis_reader | |
end | |
private | |
def detach_redis_reader | |
Thread.new do | |
Thread.current.abort_on_exception = true | |
redis = Redis.new(:host => "localhost", :port => 6379) | |
loop do | |
if data = redis.blpop("jobs", 0) | |
@q << data[1] | |
end | |
end | |
end | |
end | |
end | |
class ExtractJobBolt < RedStorm::SimpleBolt | |
on_receive do |tuple| | |
json = JSON.parse(tuple.getString(0)) | |
criteria, id = json['criteria'], json['id'] | |
[id, JSON(criteria)] | |
end | |
end | |
class ExecuteSearchBolt < RedStorm::SimpleBolt | |
on_receive :emit => false do |tuple| | |
json = JSON.parse(tuple[:criteria]) | |
job_id = tuple[:job_id] | |
page = 1 | |
per_page = 500 | |
term = json['keywords'] || 'test' | |
result = PersistentArticle.search(term, per_page: per_page, page: 1) | |
total = result.total | |
n = result.size | |
unanchored_emit(page, per_page, total, job_id, result.to_json) | |
# puts ["EXEC", job_id, page, per_page, total, term, n].join(" -- ") | |
while n < total && n < 10_000 | |
page += 1 | |
# puts ["EXEC", job_id, page, per_page, total, term, n].join(" -- ") | |
result = PersistentArticle.search(term, per_page: per_page, page: page) | |
n += result.size | |
unanchored_emit(page, per_page, total, job_id, result.to_json) | |
end | |
end | |
end | |
class GenerateCsvBolt < RedStorm::SimpleBolt | |
on_init do | |
@csv = [] | |
end | |
on_receive do |tuple| | |
job_id = tuple[:job_id] | |
rows = JSON.parse(tuple[:rows]) | |
page = tuple[:page] | |
per_page = tuple[:per_page] | |
total = tuple[:total] | |
csv = rows.reduce([]) { |r, row| | |
r << row.values_at(*%w[title tags]).join(",") | |
}.join("\n") | |
[page, per_page, total, job_id, csv] | |
end | |
end | |
class WriteFileBolt < RedStorm::SimpleBolt | |
on_receive :emit => false do |tuple| | |
puts "WriteFileBolt" | |
job_id = tuple[:job_id] | |
csv = tuple[:csv] | |
page = tuple[:page] | |
per_page = tuple[:per_page] | |
total = tuple[:total] | |
fn = "csv_#{job_id}.csv" | |
if page==1 | |
File.delete(fn) if File.exists?(fn) | |
end | |
File.open(fn, "a") do |f| | |
f.print csv | |
end | |
end | |
end | |
class CsvTopology < RedStorm::SimpleTopology | |
spout JobsSpout do | |
output_fields :job | |
end | |
bolt ExtractJobBolt do | |
source JobsSpout, :shuffle | |
output_fields :job_id, :criteria | |
end | |
bolt ExecuteSearchBolt, :parallelism => 10 do | |
source ExtractJobBolt, :fields => :job_id | |
output_fields :page, :per_page, :total, :job_id, :rows | |
end | |
bolt GenerateCsvBolt, :parallelism => 2 do | |
source ExecuteSearchBolt, :fields => :job_id | |
output_fields :page, :per_page, :total, :job_id, :csv | |
end | |
bolt WriteFileBolt do | |
source GenerateCsvBolt, :fields => :job_id | |
output_fields :job_id, :file | |
end | |
configure do |env| | |
debug false | |
case env | |
when :local | |
max_task_parallelism 100 | |
num_workers 25 | |
when :cluster | |
num_workers 20 | |
max_spout_pending 5000 | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment