Last active
December 20, 2015 12:39
-
-
Save hopsoft/6133229 to your computer and use it in GitHub Desktop.
Helpers for making DRb a bit simpler for common tasks that benefit from concurrent processing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "drb" | |
require "monitor" | |
require "socket" | |
module Shared | |
class DistributedObject | |
include MonitorMixin | |
attr_reader :pid, :uri | |
def self.async(name) | |
define_method "#{name}_async" do |*args| | |
Thread.new do | |
synchronize { @busy = true } | |
before_async | |
send(name, *args) | |
synchronize { @busy = false } | |
Thread.terminate | |
end | |
end | |
end | |
def ok? | |
true | |
end | |
def busy? | |
!!@busy | |
end | |
def before_async | |
# abstract method to be implemented in subclasses | |
end | |
def start_remote_instance(port=nil) | |
return if pid.present? | |
@uri = "druby://localhost:#{port || get_open_port}" | |
@pid = fork do | |
before_async | |
DRb.start_service uri, self | |
DRb.thread.join | |
end | |
Process.detach pid | |
end | |
def exit | |
DRb.stop_service | |
end | |
protected | |
def get_open_port | |
socket = Socket.new(:INET, :STREAM, 0) | |
socket.bind(Addrinfo.tcp("127.0.0.1", 0)) | |
socket.local_address.ip_port | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "drb" | |
module Shared | |
class DistributedObjectManager | |
attr_reader( | |
:distributed_object_class, | |
:max_remote_instances, | |
:local_instances, | |
:remote_instances | |
) | |
def initialize(distributed_object_class, max_remote_instances) | |
@distributed_object_class = distributed_object_class | |
@max_remote_instances = max_remote_instances | |
@local_instances = [] | |
@remote_instances = [] | |
end | |
def start_remote_instances(port=nil) | |
DRb.start_service | |
(1..max_remote_instances).each do |i| | |
local_instance = distributed_object_class.new | |
if port.present? | |
local_instance.start_remote_instance(port) | |
port += 1 | |
else | |
local_instance.start_remote_instance | |
end | |
local_instances << local_instance | |
remote_instance = DRbObject.new_with_uri(local_instance.uri) | |
verify_remote_instance(remote_instance) | |
remote_instances << remote_instance | |
end | |
end | |
def stop_remote_instances | |
remote_instances.each do |remote_instance| | |
remote_instance.exit rescue nil | |
end | |
DRb.stop_service | |
end | |
def get_remote_instance | |
@remote_instance_index ||= 0 | |
remote_instance = remote_instances[@remote_instance_index] | |
sleep 0.01 while remote_instance.busy? | |
@remote_instance_index += 1 | |
@remote_instance_index = 0 if @remote_instance_index >= remote_instances.length | |
remote_instance | |
end | |
protected | |
def verify_remote_instance(remote_instance) | |
ok = false | |
while !ok | |
begin | |
ok = remote_instance.ok? | |
rescue DRb::DRbConnError | |
sleep 0.1 | |
end | |
end | |
ok | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# create a distributed object that will perform expensive operations | |
# note: illustating with Rails & ActiveRecord, but this can be applied to any Ruby project | |
class UserUpdater < Shared::DistributedObject | |
def before_async | |
ActiveRecord::Base.establish_connection ActiveRecord::Base.connection_config | |
end | |
def update_user(user_id) | |
# expensive operations & logic | |
end | |
async :update_user | |
end | |
# spin up and use 7 distributed objects to perform the work | |
# then gracefully shutdown the distributed objects when finished | |
def update_users_concurrently | |
manager = Shared::DistributedObjectManager.new(UserUpdater, 7) | |
manager.start_remote_instances(42042) | |
User.pluck(:id).each do |user_id| | |
manager.get_remote_instance.update_user_async(user_id) | |
end | |
importer_manager.stop_remote_instances | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment