Skip to content

Instantly share code, notes, and snippets.

@hopsoft
Last active December 20, 2015 12:39
Show Gist options
  • Save hopsoft/6133229 to your computer and use it in GitHub Desktop.
Save hopsoft/6133229 to your computer and use it in GitHub Desktop.
Helpers for making DRb a bit simpler for common tasks that benefit from concurrent processing.
require "drb"
require "monitor"
require "socket"
module Shared
class DistributedObject
include MonitorMixin
attr_reader :pid, :uri
def self.async(name)
define_method "#{name}_async" do |*args|
Thread.new do
synchronize { @busy = true }
before_async
send(name, *args)
synchronize { @busy = false }
Thread.terminate
end
end
end
def ok?
true
end
def busy?
!!@busy
end
def before_async
# abstract method to be implemented in subclasses
end
def start_remote_instance(port=nil)
return if pid.present?
@uri = "druby://localhost:#{port || get_open_port}"
@pid = fork do
before_async
DRb.start_service uri, self
DRb.thread.join
end
Process.detach pid
end
def exit
DRb.stop_service
end
protected
def get_open_port
socket = Socket.new(:INET, :STREAM, 0)
socket.bind(Addrinfo.tcp("127.0.0.1", 0))
socket.local_address.ip_port
end
end
end
require "drb"
module Shared
class DistributedObjectManager
attr_reader(
:distributed_object_class,
:max_remote_instances,
:local_instances,
:remote_instances
)
def initialize(distributed_object_class, max_remote_instances)
@distributed_object_class = distributed_object_class
@max_remote_instances = max_remote_instances
@local_instances = []
@remote_instances = []
end
def start_remote_instances(port=nil)
DRb.start_service
(1..max_remote_instances).each do |i|
local_instance = distributed_object_class.new
if port.present?
local_instance.start_remote_instance(port)
port += 1
else
local_instance.start_remote_instance
end
local_instances << local_instance
remote_instance = DRbObject.new_with_uri(local_instance.uri)
verify_remote_instance(remote_instance)
remote_instances << remote_instance
end
end
def stop_remote_instances
remote_instances.each do |remote_instance|
remote_instance.exit rescue nil
end
DRb.stop_service
end
def get_remote_instance
@remote_instance_index ||= 0
remote_instance = remote_instances[@remote_instance_index]
sleep 0.01 while remote_instance.busy?
@remote_instance_index += 1
@remote_instance_index = 0 if @remote_instance_index >= remote_instances.length
remote_instance
end
protected
def verify_remote_instance(remote_instance)
ok = false
while !ok
begin
ok = remote_instance.ok?
rescue DRb::DRbConnError
sleep 0.1
end
end
ok
end
end
end
# create a distributed object that will perform expensive operations
# note: illustating with Rails & ActiveRecord, but this can be applied to any Ruby project
class UserUpdater < Shared::DistributedObject
def before_async
ActiveRecord::Base.establish_connection ActiveRecord::Base.connection_config
end
def update_user(user_id)
# expensive operations & logic
end
async :update_user
end
# spin up and use 7 distributed objects to perform the work
# then gracefully shutdown the distributed objects when finished
def update_users_concurrently
manager = Shared::DistributedObjectManager.new(UserUpdater, 7)
manager.start_remote_instances(42042)
User.pluck(:id).each do |user_id|
manager.get_remote_instance.update_user_async(user_id)
end
importer_manager.stop_remote_instances
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment