Last active
April 17, 2025 14:48
-
-
Save thadeu/144fb09835b2179141c32df830ab9407 to your computer and use it in GitHub Desktop.
Ruby Async Wrapper & Promise
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
module AsyncWrapper | |
def self.included(base) | |
base.extend(ClassMethods) | |
end | |
def self.extended(base) | |
base.extend(ClassMethods) | |
end | |
module ClassMethods | |
def async(method_name) | |
begin | |
original_method = singleton_method(method_name) | |
define_singleton_method(method_name) do |*args, **kwargs, &block| | |
Promise.new { original_method.call(*args, **kwargs, &block) } | |
end | |
rescue NameError | |
original_method = instance_method(method_name) | |
define_method(method_name) do |*args, **kwargs, &block| | |
Promise.new { original_method.bind(self).call(*args, **kwargs, &block) } | |
end | |
end | |
end | |
end | |
def await(promise) | |
return promise unless promise.is_a?(Promise) | |
promise.await | |
end | |
end | |
class Object | |
include AsyncWrapper | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class Promise | |
class Rejected < StandardError; end | |
PENDING = :pending | |
FULFILLED = :fulfilled | |
REJECTED = :rejected | |
attr_reader :state, :value, :reason | |
def initialize(value = nil, &block) | |
@state = PENDING | |
@value = value | |
@reason = nil | |
@mutex = Mutex.new | |
@condition = ConditionVariable.new | |
@callbacks = [] | |
@error_callbacks = [] | |
if block_given? | |
Thread.new do | |
begin | |
result = block.call | |
resolve(result) | |
rescue StandardError => e | |
reject(e) | |
end | |
end | |
else | |
resolve(value) | |
end | |
end | |
def await | |
@mutex.synchronize do | |
@condition.wait(@mutex) while @state == PENDING | |
end | |
raise Rejected, @reason.to_s if @state == REJECTED | |
@value | |
end | |
def resolve(value = nil) | |
@mutex.synchronize do | |
return if @state != PENDING | |
@state = FULFILLED | |
@value = value | |
@condition&.broadcast | |
execute_callbacks | |
end | |
end | |
def reject(reason = nil) | |
@mutex.synchronize do | |
return if @state != PENDING | |
@state = REJECTED | |
@reason = reason | |
@condition&.broadcast | |
execute_error_callbacks | |
end | |
end | |
def then(&block) | |
@mutex.synchronize do | |
if @state == FULFILLED | |
block.call(@value) | |
elsif @state == PENDING | |
@callbacks << block | |
end | |
end | |
self | |
end | |
def rescue(&block) | |
@mutex.synchronize do | |
if @state == REJECTED | |
block.call(@reason) | |
elsif @state == PENDING | |
@error_callbacks << block | |
end | |
end | |
self | |
end | |
def self.resolve(value = nil) | |
new { value } | |
end | |
def self.reject(reason = nil) | |
raise Rejected, reason | |
end | |
def self.all(promises, raise_on_error: true) | |
return Promise.resolve([]) if promises.empty? | |
Settled.new(promises, raise_on_error: raise_on_error).run! | |
end | |
def self.all_settled(promises) | |
return Promise.resolve([]) if promises.empty? | |
Settled.new(promises, type: :all_settled).run! | |
end | |
private | |
def execute_callbacks | |
callbacks = @callbacks.dup | |
@callbacks.clear | |
callbacks.each { |callback| callback.call(@value) } | |
end | |
def execute_error_callbacks | |
callbacks = @error_callbacks.dup | |
@error_callbacks.clear | |
callbacks.each { |callback| callback.call(@reason) } | |
end | |
class Result | |
include Enumerable | |
attr_reader :value | |
def initialize(value) | |
@value = value | |
end | |
def deconstruct | |
[@value, @value.size] | |
end | |
def deconstruct_keys(_keys) | |
{ value: @value, count: @value.size } | |
end | |
def and_then(&block) | |
block.call(@value) | |
self | |
end | |
def on_catch(&block) | |
block.call(@value) | |
self | |
end | |
def each(&block) | |
block.call(@value) | |
self | |
end | |
end | |
class Batch | |
attr_reader :promises | |
def initialize(promises) | |
@promises = promises.keep_if { |promise| promise.is_a?(Promise) } | |
@queue = Thread::Queue.new | |
@remaining = promises.size | |
end | |
def run!(&block) | |
return if @promises.empty? | |
@promises.each_with_index do |promise, index| | |
Thread.new do | |
promise.then do |value| | |
@queue << [:success, index, value] | |
end&.rescue do |reason| | |
@queue << [:error, index, reason] | |
end | |
rescue StandardError => e | |
@queue << [:error, index, e] | |
end | |
end | |
while @remaining.positive? | |
block.call(@queue.pop) | |
@remaining -= 1 | |
end | |
end | |
end | |
class Settled | |
def initialize(promises, type: :all, raise_on_error: true) | |
@results = Array.new(promises.size) | |
@batch = Batch.new(promises) | |
@type = type | |
@raise_on_error = raise_on_error | |
end | |
def run! | |
return Result.new([]) if @batch.promises.empty? | |
@batch.run! do |type, index, value| | |
case type | |
when :success | |
@results[index] = success_row(value) | |
when :error | |
raise Rejected, value if @raise_on_error && @type == :all | |
@results[index] = rejected_row(value) | |
end | |
end | |
Result.new(@results) | |
end | |
def success_row(value) | |
return value if @type == :all | |
{ status: Promise::FULFILLED, value: value } | |
end | |
def rejected_row(reason) | |
return reason if @type == :all | |
{ status: Promise::REJECTED, reason: reason } | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Purge | |
include Sidekiq::Worker | |
attr_reader :user | |
def perform(user_id) | |
@user = User.deleted.find_by(id: user_id) | |
return if user.blank? | |
user.update_columns(status: 'deleted_processing') | |
redis_task = RedisTask.new | |
tasks = [ | |
Promise.new { RedisTask.new.perform(1) }, # using Promise.new | |
redis_task.perform(1), # usign async def | |
redis_task.perform(2), | |
redis_task.perform(3), | |
redis_task.perform(4), | |
redis_task.perform(5), | |
redis_task.perform(6), | |
redis_task.perform(7), | |
redis_task.perform(8), | |
redis_task.perform(9), | |
redis_task.perform(10) | |
] | |
p "--> Preparing promises" | |
# wait all threads finished | |
result = Promise.all_settled(tasks) | |
# result = Promise.all(tasks) | |
# result = Promise.all(tasks, raise_on_error: false) | |
p "--> Promise all finished in 4s" | |
p result | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class RedisTask | |
# See async keyword here! | |
async def perform(user_id) | |
user = User.with_deleted.find(user_id) | |
return if user.blank? | |
p "Redis waiting 4 seconds" | |
sleep 4 | |
p "Redis done" | |
end | |
end | |
RedisTask.new.perform(1).await | |
await RedisTask.new.perform(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've tought in the possible names given that names like promise, vow, pact it already used.