Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created January 30, 2012 12:07
Show Gist options
  • Save arturaz/1704077 to your computer and use it in GitHub Desktop.
Save arturaz/1704077 to your computer and use it in GitHub Desktop.
$LOAD_PATH << "lib/"
require 'celluloid'
class LogActor
include Celluloid
def log(message); puts message; end
end
LOG = LogActor.new
module Log
def log(message)
LOG.log!("%s [%s] %s" % [
Time.now.strftime("%H:%M:%S"), @name, message
])
end
end
class Worker
include Celluloid
include Log
def initialize(director, syncer, name)
@director = director
@syncer = syncer
@name = name
end
def work(player_ids, message)
log "--- starting work (#{player_ids.inspect}) for #{message} seconds ---"
Kernel.sleep message
log "--- done working (#{player_ids.inspect}) for #{message} seconds ---"
@director.done!(@name, player_ids)
end
# Emit a sync with _token_.
def sync(token)
@syncer.sync!(@name, token)
end
# Block until syncer gets _count_ sync emits for _token_.
def request_sync(token, count)
exclusive do
@syncer.request_sync(@name, count, token)
end
end
end
class Syncer
include Log
include Celluloid
def initialize
@name = 'syncer'
@tokens = Hash.new(0)
end
def request_sync(name, count, token)
log "#{name} requested sync (#{count}) for token #{token}."
until @tokens[token] == count
wait token
log "Sync token #{token} received, #{count - @tokens[token]
} syncs remaining for #{name}."
end
@tokens.delete token
log "Sync done for #{name} with token #{token}."
end
def sync(name, token)
log "#{name} syncing in with token #{token}."
@tokens[token] += 1
signal token
end
end
class Director
class WorkerEntry < Struct.new(:name, :worker); end
class PidsTracker
def initialize
@data = {}
end
def to_s
@data.inspect
end
def empty?
@data.size == 0
end
def register_pids(player_ids, name)
player_ids.each do |player_id|
register_pid(player_id, name)
end
end
def register_pid(player_id, name)
@data[player_id] ||= {}
@data[player_id][name] ||= 0
@data[player_id][name] += 1
end
def unregister_pids(player_ids, name)
player_ids.each do |player_id|
unregister_pid(player_id, name)
end
end
def unregister_pid(player_id, name)
remaining = @data[player_id][name] -= 1
@data[player_id].delete name if remaining == 0
@data.delete player_id if @data[player_id].size == 0
true
end
def currently_working_on(player_ids)
working = Set.new
player_ids.each do |player_id|
working += @data[player_id].keys if @data.has_key?(player_id)
end
working.to_a
end
end
include Log
include Celluloid
def initialize(pool=10)
@name = "director"
syncer = Syncer.new
@workers = {}
@free_workers = []
pool.times do |i|
name = :"worker_#{i}"
worker = Worker.new(current_actor, syncer, name)
entry = WorkerEntry.new(name, worker)
@workers[name] = entry
@free_workers << entry
end
@jobs = Hash.new(0)
@pids = PidsTracker.new
@sync_token = 0
end
def work(player_ids, message)
#log "*** Got work for #{player_ids.inspect}! ***"
worker = reserve_worker(player_ids)
worker.work!(player_ids, message)
#report
end
def done(name, player_ids)
#log "*** #{name} is done working for #{player_ids.inspect} ***"
#report
#log "Processing."
@pids.unregister_pids(player_ids, name)
jobs_left = @jobs[name] -= 1
@free_workers << @workers[name] if jobs_left == 0
report
end
def report
#log "Free: #{@free_workers.size}, @pids: #{@pids}, @jobs: #{@jobs.inspect}"
log "Free: #{@free_workers.size}, @jobs: #{@jobs.inspect}"
end
def finished?
@pids.empty?
end
private
def reserve_worker(player_ids)
currently_on = @pids.currently_working_on(player_ids).map do |name|
@workers[name]
end
if currently_on.size == 0
# No worker is working on any of the _player_ids_.
entry = @free_workers.shift
if entry.nil?
log "No free workers left! Taking busy one."
name, entry = @workers.first
else
log "Taking free worker."
end
else
entry = currently_on[0]
log "Taking already working worker #{entry.name}."
rest = currently_on[1..-1]
unless rest.size == 0
# We need to ensure that other workers have finished working on
# player_ids before this worker starts working on them, therefore
# we must issue sync requests.
@sync_token += 1
names = rest.map { |e| e.name }.join(",")
log "Syncing #{entry.name} with token #{@sync_token} to #{names} (#{
rest.size})."
entry.worker.request_sync!(@sync_token, rest.size)
rest.each do |sync_entry|
sync_entry.worker.sync!(@sync_token)
end
end
end
log "Reserved #{entry.name} for #{player_ids.inspect}."
@pids.register_pids(player_ids, entry.name)
@jobs[entry.name] += 1
entry.worker
end
end
puts "Starting simulation"
start = Time.now
d = Director.new(20)
srand(1)
100.times do
sleep (rand(20) + 1) * 0.1
time = rand(5) + 1
player_ids = (1..rand(5)).map { rand(10000) }.uniq
player_ids.push nil if rand() > 0.5
player_ids.freeze
d.work!(player_ids, time)
end
sleep 1 until d.finished?
duration = Time.now - start
puts "Duration: #{duration}"
puts "Reporting"
d.report
sleep 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment