Created
January 30, 2012 12:07
-
-
Save arturaz/1704077 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$LOAD_PATH << "lib/" | |
require 'celluloid' | |
class LogActor | |
include Celluloid | |
def log(message); puts message; end | |
end | |
LOG = LogActor.new | |
module Log | |
def log(message) | |
LOG.log!("%s [%s] %s" % [ | |
Time.now.strftime("%H:%M:%S"), @name, message | |
]) | |
end | |
end | |
class Worker | |
include Celluloid | |
include Log | |
def initialize(director, syncer, name) | |
@director = director | |
@syncer = syncer | |
@name = name | |
end | |
def work(player_ids, message) | |
log "--- starting work (#{player_ids.inspect}) for #{message} seconds ---" | |
Kernel.sleep message | |
log "--- done working (#{player_ids.inspect}) for #{message} seconds ---" | |
@director.done!(@name, player_ids) | |
end | |
# Emit a sync with _token_. | |
def sync(token) | |
@syncer.sync!(@name, token) | |
end | |
# Block until syncer gets _count_ sync emits for _token_. | |
def request_sync(token, count) | |
exclusive do | |
@syncer.request_sync(@name, count, token) | |
end | |
end | |
end | |
class Syncer | |
include Log | |
include Celluloid | |
def initialize | |
@name = 'syncer' | |
@tokens = Hash.new(0) | |
end | |
def request_sync(name, count, token) | |
log "#{name} requested sync (#{count}) for token #{token}." | |
until @tokens[token] == count | |
wait token | |
log "Sync token #{token} received, #{count - @tokens[token] | |
} syncs remaining for #{name}." | |
end | |
@tokens.delete token | |
log "Sync done for #{name} with token #{token}." | |
end | |
def sync(name, token) | |
log "#{name} syncing in with token #{token}." | |
@tokens[token] += 1 | |
signal token | |
end | |
end | |
class Director | |
class WorkerEntry < Struct.new(:name, :worker); end | |
class PidsTracker | |
def initialize | |
@data = {} | |
end | |
def to_s | |
@data.inspect | |
end | |
def empty? | |
@data.size == 0 | |
end | |
def register_pids(player_ids, name) | |
player_ids.each do |player_id| | |
register_pid(player_id, name) | |
end | |
end | |
def register_pid(player_id, name) | |
@data[player_id] ||= {} | |
@data[player_id][name] ||= 0 | |
@data[player_id][name] += 1 | |
end | |
def unregister_pids(player_ids, name) | |
player_ids.each do |player_id| | |
unregister_pid(player_id, name) | |
end | |
end | |
def unregister_pid(player_id, name) | |
remaining = @data[player_id][name] -= 1 | |
@data[player_id].delete name if remaining == 0 | |
@data.delete player_id if @data[player_id].size == 0 | |
true | |
end | |
def currently_working_on(player_ids) | |
working = Set.new | |
player_ids.each do |player_id| | |
working += @data[player_id].keys if @data.has_key?(player_id) | |
end | |
working.to_a | |
end | |
end | |
include Log | |
include Celluloid | |
def initialize(pool=10) | |
@name = "director" | |
syncer = Syncer.new | |
@workers = {} | |
@free_workers = [] | |
pool.times do |i| | |
name = :"worker_#{i}" | |
worker = Worker.new(current_actor, syncer, name) | |
entry = WorkerEntry.new(name, worker) | |
@workers[name] = entry | |
@free_workers << entry | |
end | |
@jobs = Hash.new(0) | |
@pids = PidsTracker.new | |
@sync_token = 0 | |
end | |
def work(player_ids, message) | |
#log "*** Got work for #{player_ids.inspect}! ***" | |
worker = reserve_worker(player_ids) | |
worker.work!(player_ids, message) | |
#report | |
end | |
def done(name, player_ids) | |
#log "*** #{name} is done working for #{player_ids.inspect} ***" | |
#report | |
#log "Processing." | |
@pids.unregister_pids(player_ids, name) | |
jobs_left = @jobs[name] -= 1 | |
@free_workers << @workers[name] if jobs_left == 0 | |
report | |
end | |
def report | |
#log "Free: #{@free_workers.size}, @pids: #{@pids}, @jobs: #{@jobs.inspect}" | |
log "Free: #{@free_workers.size}, @jobs: #{@jobs.inspect}" | |
end | |
def finished? | |
@pids.empty? | |
end | |
private | |
def reserve_worker(player_ids) | |
currently_on = @pids.currently_working_on(player_ids).map do |name| | |
@workers[name] | |
end | |
if currently_on.size == 0 | |
# No worker is working on any of the _player_ids_. | |
entry = @free_workers.shift | |
if entry.nil? | |
log "No free workers left! Taking busy one." | |
name, entry = @workers.first | |
else | |
log "Taking free worker." | |
end | |
else | |
entry = currently_on[0] | |
log "Taking already working worker #{entry.name}." | |
rest = currently_on[1..-1] | |
unless rest.size == 0 | |
# We need to ensure that other workers have finished working on | |
# player_ids before this worker starts working on them, therefore | |
# we must issue sync requests. | |
@sync_token += 1 | |
names = rest.map { |e| e.name }.join(",") | |
log "Syncing #{entry.name} with token #{@sync_token} to #{names} (#{ | |
rest.size})." | |
entry.worker.request_sync!(@sync_token, rest.size) | |
rest.each do |sync_entry| | |
sync_entry.worker.sync!(@sync_token) | |
end | |
end | |
end | |
log "Reserved #{entry.name} for #{player_ids.inspect}." | |
@pids.register_pids(player_ids, entry.name) | |
@jobs[entry.name] += 1 | |
entry.worker | |
end | |
end | |
puts "Starting simulation" | |
start = Time.now | |
d = Director.new(20) | |
srand(1) | |
100.times do | |
sleep (rand(20) + 1) * 0.1 | |
time = rand(5) + 1 | |
player_ids = (1..rand(5)).map { rand(10000) }.uniq | |
player_ids.push nil if rand() > 0.5 | |
player_ids.freeze | |
d.work!(player_ids, time) | |
end | |
sleep 1 until d.finished? | |
duration = Time.now - start | |
puts "Duration: #{duration}" | |
puts "Reporting" | |
d.report | |
sleep 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment