Created
October 9, 2010 20:49
-
-
Save brainopia/618588 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'em-resolv-replace' | |
require 'em-synchrony/em-http' | |
require 'fiber_pool' | |
gem 'em_postgresql' | |
class Pool | |
delegate :spawn, to: :@pool | |
attr_reader :processed, :concurrent | |
def initialize(size) | |
@concurrent, @pool, @processed = size, FiberPool.new(size), 0 | |
@pool.generic_callbacks << proc { @processed += 1 } | |
end | |
def active | |
@pool.busy_fibers.size | |
end | |
end | |
class << (Handler = Object.new) | |
def start(*concurrency_level) | |
init_pools *concurrency_level | |
init_db | |
init_status_update | |
init_exit_callback | |
Fiber.new do | |
init_variables | |
run | |
end.resume | |
end | |
private | |
def finish | |
output_general_info | |
EM.stop | |
end | |
def run | |
startup_message | |
iterate_users | |
end | |
def init_pools(db1, db2, http) | |
@db1 = Pool.new db1 | |
@db2 = Pool.new db2 | |
@http = Pool.new http | |
end | |
def init_db | |
ActiveRecord::Base.establish_connection db_config | |
User.after_save.clear | |
end | |
def db_config | |
rails_settings = ActiveRecord::Base.configurations[Rails.env] | |
default_settings = { host: 'localhost', username: ENV['USER'] } | |
evented_settings = { pool: (@db1.concurrent + @db2.concurrent), adapter: 'em_postgresql' } | |
default_settings.merge(rails_settings).merge(evented_settings) | |
end | |
def init_variables | |
@failed_http = 0 | |
@without_uid = 0 | |
@batch_size = 50 | |
@conditions = { conditions: "email SIMILAR TO '%@(mail|inbox|bk|list).ru' and mailru_uid is NULL" } | |
@count = User.count @conditions | |
@batch_count = (@count.to_f / @batch_size).ceil | |
@start_time = Time.now | |
end | |
def init_exit_callback | |
trap('INT') { finish } | |
end | |
def init_status_update | |
EM::PeriodicTimer.new(1.5) do | |
unless @http.processed == 0 | |
@spent_time = Time.now - @start_time | |
output_general_info | |
output_progress | |
end | |
end | |
end | |
def output_general_info | |
puts <<-TEXT | |
Spent time: #{round(@spent_time/60)} minutes | |
Average: #{round(@db2.processed / @spent_time)} updated users per second | |
Processed #{@db1.processed} select calls | |
Processed #{@http.processed} http calls, #{ round(100 - percent(@failed_http)) }% successful | |
Processed #{@db2.processed} update calls | |
#{round percent(@without_uid)}% (#{@without_uid} users) were without uid | |
TEXT | |
end | |
def iterate_users | |
EM::Iterator.new(0...@batch_count, 4).each method(:iterate_cycle), method(:finish) | |
end | |
def iterate_cycle(index, iteration) | |
@db1.spawn { handle_batch(index, iteration) } | |
end | |
def batch(index) | |
User.all @conditions.merge(limit: @batch_size, offset: index * @batch_size, select: 'id, email') | |
end | |
def handle_batch(batch_index, iteration) | |
batch(batch_index).each_with_index do |user, user_index| | |
@http.spawn do | |
handle user | |
iteration.next if user_index + 1 == @batch_size | |
end | |
end | |
end | |
def handle(user) | |
request = EM::HttpRequest.new(uid_url user).get | |
response = extract_response request | |
if response | |
@db2.spawn { user.update_attribute :mailru_uid, response['uid'] } | |
elsif request.response_header.status == 200 | |
@without_uid += 1 | |
else | |
@failed_http += 1 | |
end | |
end | |
def extract_response(request) | |
ActiveSupport::JSON.decode(request.response) if request.response_header.status == 200 | |
end | |
def uid_url(user) | |
name, domain = user.email[0...-3].split('@') | |
domain.replace 'corp' if domain == 'corp.mail' | |
"http://appsmail.ru/platform/#{domain}/#{name}" | |
end | |
def output_progress | |
puts <<-TEXT | |
Completed #{round percent(@db2.processed, @count)}% | |
Estimated #{round((@spent_time*@count/@db2.processed-@spent_time)/60)} minutes until finish | |
In process: http requests - #{@http.active}, updates - #{@db2.active}, selects - #{@db1.active} | |
TEXT | |
end | |
def round(value, precision=1) | |
"%.#{precision}f" % value | |
end | |
def percent(part, [email protected]) | |
(part.to_f / whole) * 100 | |
end | |
def startup_message | |
puts "Starting to update #{@count} users" | |
end | |
end | |
desc 'Update mailru uids' | |
task mailru_uids: :environment do | |
EM.run do | |
Handler.start 2, 10, 50 | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment