Skip to content

Instantly share code, notes, and snippets.

@thadeu
Last active April 17, 2025 14:48
Show Gist options
  • Save thadeu/144fb09835b2179141c32df830ab9407 to your computer and use it in GitHub Desktop.
Save thadeu/144fb09835b2179141c32df830ab9407 to your computer and use it in GitHub Desktop.
Ruby Async Wrapper & Promise
# frozen_string_literal: true
module AsyncWrapper
def self.included(base)
base.extend(ClassMethods)
end
def self.extended(base)
base.extend(ClassMethods)
end
module ClassMethods
def async(method_name)
begin
original_method = singleton_method(method_name)
define_singleton_method(method_name) do |*args, **kwargs, &block|
Promise.new { original_method.call(*args, **kwargs, &block) }
end
rescue NameError
original_method = instance_method(method_name)
define_method(method_name) do |*args, **kwargs, &block|
Promise.new { original_method.bind(self).call(*args, **kwargs, &block) }
end
end
end
end
def await(promise)
return promise unless promise.is_a?(Promise)
promise.await
end
end
class Object
include AsyncWrapper
end
# frozen_string_literal: true
class Promise
class Rejected < StandardError; end
PENDING = :pending
FULFILLED = :fulfilled
REJECTED = :rejected
attr_reader :state, :value, :reason
def initialize(value = nil, &block)
@state = PENDING
@value = value
@reason = nil
@mutex = Mutex.new
@condition = ConditionVariable.new
@callbacks = []
@error_callbacks = []
if block_given?
Thread.new do
begin
result = block.call
resolve(result)
rescue StandardError => e
reject(e)
end
end
else
resolve(value)
end
end
def await
@mutex.synchronize do
@condition.wait(@mutex) while @state == PENDING
end
raise Rejected, @reason.to_s if @state == REJECTED
@value
end
def resolve(value = nil)
@mutex.synchronize do
return if @state != PENDING
@state = FULFILLED
@value = value
@condition&.broadcast
execute_callbacks
end
end
def reject(reason = nil)
@mutex.synchronize do
return if @state != PENDING
@state = REJECTED
@reason = reason
@condition&.broadcast
execute_error_callbacks
end
end
def then(&block)
@mutex.synchronize do
if @state == FULFILLED
block.call(@value)
elsif @state == PENDING
@callbacks << block
end
end
self
end
def rescue(&block)
@mutex.synchronize do
if @state == REJECTED
block.call(@reason)
elsif @state == PENDING
@error_callbacks << block
end
end
self
end
def self.resolve(value = nil)
new { value }
end
def self.reject(reason = nil)
raise Rejected, reason
end
def self.all(promises, raise_on_error: true)
return Promise.resolve([]) if promises.empty?
Settled.new(promises, raise_on_error: raise_on_error).run!
end
def self.all_settled(promises)
return Promise.resolve([]) if promises.empty?
Settled.new(promises, type: :all_settled).run!
end
private
def execute_callbacks
callbacks = @callbacks.dup
@callbacks.clear
callbacks.each { |callback| callback.call(@value) }
end
def execute_error_callbacks
callbacks = @error_callbacks.dup
@error_callbacks.clear
callbacks.each { |callback| callback.call(@reason) }
end
class Result
include Enumerable
attr_reader :value
def initialize(value)
@value = value
end
def deconstruct
[@value, @value.size]
end
def deconstruct_keys(_keys)
{ value: @value, count: @value.size }
end
def and_then(&block)
block.call(@value)
self
end
def on_catch(&block)
block.call(@value)
self
end
def each(&block)
block.call(@value)
self
end
end
class Batch
attr_reader :promises
def initialize(promises)
@promises = promises.keep_if { |promise| promise.is_a?(Promise) }
@queue = Thread::Queue.new
@remaining = promises.size
end
def run!(&block)
return if @promises.empty?
@promises.each_with_index do |promise, index|
Thread.new do
promise.then do |value|
@queue << [:success, index, value]
end&.rescue do |reason|
@queue << [:error, index, reason]
end
rescue StandardError => e
@queue << [:error, index, e]
end
end
while @remaining.positive?
block.call(@queue.pop)
@remaining -= 1
end
end
end
class Settled
def initialize(promises, type: :all, raise_on_error: true)
@results = Array.new(promises.size)
@batch = Batch.new(promises)
@type = type
@raise_on_error = raise_on_error
end
def run!
return Result.new([]) if @batch.promises.empty?
@batch.run! do |type, index, value|
case type
when :success
@results[index] = success_row(value)
when :error
raise Rejected, value if @raise_on_error && @type == :all
@results[index] = rejected_row(value)
end
end
Result.new(@results)
end
def success_row(value)
return value if @type == :all
{ status: Promise::FULFILLED, value: value }
end
def rejected_row(reason)
return reason if @type == :all
{ status: Promise::REJECTED, reason: reason }
end
end
end
class Purge
include Sidekiq::Worker
attr_reader :user
def perform(user_id)
@user = User.deleted.find_by(id: user_id)
return if user.blank?
user.update_columns(status: 'deleted_processing')
redis_task = RedisTask.new
tasks = [
Promise.new { RedisTask.new.perform(1) }, # using Promise.new
redis_task.perform(1), # usign async def
redis_task.perform(2),
redis_task.perform(3),
redis_task.perform(4),
redis_task.perform(5),
redis_task.perform(6),
redis_task.perform(7),
redis_task.perform(8),
redis_task.perform(9),
redis_task.perform(10)
]
p "--> Preparing promises"
# wait all threads finished
result = Promise.all_settled(tasks)
# result = Promise.all(tasks)
# result = Promise.all(tasks, raise_on_error: false)
p "--> Promise all finished in 4s"
p result
end
end
# frozen_string_literal: true
class RedisTask
# See async keyword here!
async def perform(user_id)
user = User.with_deleted.find(user_id)
return if user.blank?
p "Redis waiting 4 seconds"
sleep 4
p "Redis done"
end
end
RedisTask.new.perform(1).await
await RedisTask.new.perform(1)
@thadeu
Copy link
Author

thadeu commented Apr 17, 2025

I've tought in the possible names given that names like promise, vow, pact it already used.

p1 = Ract { 1 }
p2 = Ract { 2 }
p3 = Ract { 3 }

Ract.configure do |config|
  # For every ract instances.
  config.raise_on_fail = false
end

Ract.take([p1, p2, p3])
Ract.all([p1, p2, p3])
Ract.race([p1, p2, p3])

p4 = ract { HTTParty.get("https://api.github.com") }
p5 = ract { HTTParty.get("https://bitbucket.org") }

Ract.take([p4, p5])
Ract.all([p4, p5])
Ract.race([p4, p5])

TaskGroup.take([p4, p5])
TG.take([p4, p5])

p6 = tg { HTTParty.get("https://bitbucket.org") }

Pakt.take([p1, p2, p3])
Pakt.all([p1, p2, p3])
Pakt.race([p1, p2, p3])

Rabt.take([p1, p2, p3])
Rabt.all([p1, p2, p3])
Rabt.race([p1, p2, p3])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment