Skip to content

Instantly share code, notes, and snippets.

@paulopatto
Last active June 4, 2018 16:08
Show Gist options
  • Save paulopatto/fe7a5d12473399d502d87ba2c77fc385 to your computer and use it in GitHub Desktop.
Save paulopatto/fe7a5d12473399d502d87ba2c77fc385 to your computer and use it in GitHub Desktop.
Simple, Async, Map/Reduce queue for Ruby
# frozen_string_literal: true
require './queue'
require './processor'
servers = %w[server1.test server2.test server3.test]
queue = Queue.new
processor = Processor.new
servers.each do |server|
queue.map do
# Simulate random wait due to network communication
processor.call server
end
end
result = queue.reduce([]) { |server, memo| memo << server }
if result.in_sync?
puts "[INFO] in sync: #{result.in_sync?}"
else
puts "[ERROR] in sync: #{result.in_sync?} - errors: \n\t#{result.errors.join("\n\t* ")}"
end
# frozen_string_literal: true
require 'ostruct'
class Processor
def call(server)
result = OpenStruct.new(name: server, in_sync: true)
sleep rand(0..3)
puts "[#{Time.now.utc}] Querying: #{server}"
raise "Boom #{server}" if rand(10) < 2
result
rescue => err
result.in_sync = false
result.error = err.message
result
end
end
# frozen_string_literal: true
require './worker'
require './result'
class Queue
def initialize
@workers = []
end
def map(&blk)
@workers << Worker.new(&blk)
end
def reduce(accumulator)
merged = @workers.each_with_object(accumulator) do |worker, acc|
yield worker.call, acc
end
Result.new(merged)
end
end
# frozen_string_literal: true
class Result
def initialize(merged)
@merged = merged
end
def in_sync?
@merged.all?(&:in_sync)
end
def errors
@merged.map(&:error).compact
end
end
# frozen_string_literal: true
class Worker
def initialize(&blk)
@blk = blk
end
def call
@blk.call
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment