Created
June 12, 2014 08:41
-
-
Save we4tech/d8384e27ff657d2b8819 to your computer and use it in GitHub Desktop.
Converts data from mongodb to CSV, just an example code (which utilizes multi threads and queue in ruby)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'bundler/setup' | |
require 'csv' | |
require 'pp' | |
# Configure and load dependencies | |
env = :development | |
config = { | |
'development' => { | |
'uri' => 'mongodb://localhost:27017/some_database' | |
} | |
} | |
Bundler.require :default | |
MongoMapper.setup config, env | |
# Define models | |
module BasicFields | |
extend ActiveSupport::Concern | |
included do | |
include MongoMapper::Document | |
key :freebase_id, String | |
key :name, Hash, default: {} | |
end | |
def all_names | |
self.name.map { |k, v| "#{v} (#{k})" }.join(', ') | |
end | |
end | |
class Film | |
include BasicFields | |
key :initial_release_year, Integer | |
key :genre_ids, Array | |
key :director_ids, Array | |
key :country_ids, Array | |
key :cast_member_ids, Array | |
key :music_composer_ids, Array | |
many :genres, in: :genre_ids | |
many :directors, in: :director_ids | |
many :countries, in: :country_ids | |
many :cast_members, in: :cast_member_ids | |
many :music_composers, in: :music_composer_ids | |
end | |
class Genre | |
include BasicFields | |
end | |
class Director | |
include BasicFields | |
end | |
class Country | |
include BasicFields | |
end | |
class CastMember | |
include BasicFields | |
end | |
class MusicComposer | |
include BasicFields | |
end | |
class FilmCsvPresenter | |
CSV_HEADERS = %w|freebase_id name year genres directors countries casts composers| | |
EMPTY = 'Empty' | |
def initialize(film) | |
@film = film | |
end | |
def to_csv | |
[].tap do |cols| | |
cols << @film.freebase_id | |
cols << @film.all_names.presence || EMPTY | |
cols << @film.initial_release_year.presence || EMPTY | |
cols << @film.genres.map(&:all_names).join(', ').presence || EMPTY | |
cols << @film.directors.map(&:all_names).join(', ').presence || EMPTY | |
cols << @film.countries.map(&:all_names).join(', ').presence || EMPTY | |
cols << @film.cast_members.map(&:all_names).join(', ').presence || EMPTY | |
cols << @film.music_composers.map(&:all_names).join(', ').presence || EMPTY | |
end | |
end | |
end | |
class FilmsExporter | |
def initialize | |
@queue = Queue.new | |
@files = [] | |
@splits = 4 | |
@worker_threads = [] | |
@all_jobs_generated = false | |
end | |
def store_as_csv!(file) | |
out "====> Store CSV into #{file}".colorize(:green) | |
started_at = Time.now | |
generate_jobs | |
create_workers(file) | |
merge_results!(file) | |
out "Total spent: #{(Time.now.to_f - started_at.to_f)} secs.".colorize(:green) | |
end | |
def merge_results!(final_file) | |
Thread.new do | |
while !@all_jobs_generated | |
sleep 0.1 | |
end | |
end.join | |
benchmark 'MERGE' do | |
@files.each do |worker_file| | |
out "----> Merging #{worker_file} into #{final_file}".colorize(:red) | |
`cat #{worker_file} | tee -a #{final_file}` | |
`rm #{worker_file}` | |
end | |
out '----> Merging completed.'.colorize(:green) | |
end | |
end | |
def generate_jobs | |
Thread.new do | |
benchmark 'GEN_JOBS' do | |
idx = 0 | |
Film.find_each do |film| | |
idx += 1 | |
out "----> Adding [#{idx}] #{film.freebase_id}...".colorize(:yellow) | |
@queue << FilmCsvPresenter.new(film).to_csv | |
end | |
@all_jobs_generated = true | |
until @worker_threads.map(&:alive?).none? | |
@queue << :eof | |
end | |
end | |
end | |
end | |
def create_workers(file) | |
@splits.times { |i| create_worker i, file } | |
end | |
def create_worker(index, file) | |
worker_file = "#{file}_#{index}" | |
@files << worker_file | |
@worker_threads << Thread.new do | |
benchmark "Worker[#{index}]" do | |
out "----> Started worker [#{index}]".colorize(:green) | |
CSV.open(worker_file, 'w') do |csv| | |
while (data = @queue.pop) | |
if data == :eof | |
out "====> Worker [#{index}] is closing.".colorize(:green) | |
break | |
end | |
out '<---- Writing to file'.colorize(:light_white) | |
csv << data | |
end | |
end | |
out "====> Worker: #{worker_file} is completed.".colorize(:green) | |
end | |
end | |
end | |
def benchmark(label) | |
out "====> Starting #{label}".colorize(:green) | |
started = Time.now | |
yield | |
took = (Time.now.to_f - started.to_f) | |
out "====> Ended #{label}".colorize(:green) | |
took_str = "====> #{label} Took: #{took} secs.".colorize(:green) | |
puts took_str | |
`echo #{took_str} | tee -a jobs_stat` | |
end | |
def out(msg) | |
@out_on ||= 'true' == ENV['OUT'] | |
puts msg if @out_on | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment