Last active
October 17, 2023 03:48
-
-
Save yaauie/eb1720c7af0b1357c67787085ea6c607 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
require 'thread' | |
require 'monitor' | |
## | |
# The FairEnoughRouter is a generic "fair enough" router. When instantiated | |
# with a collection of objects, it can be used to select an arbitrary value, | |
# prioritising ones for which it hasn't recently been exceptional and | |
# those that are currently less-concurrently used. | |
class FairEnoughRouter | |
include MonitorMixin | |
## | |
# Creates a new Router with the provided objects | |
# that ignores errors older than the cooloff period | |
# @param objects [Enumerable<Object>]: a list of objects to lease out | |
# @param cooloff [Integer]: The cooloff period in seconds in which downstreams with | |
# recent errors are deprioritized (default: 60) | |
def initialize(objects, cooloff: 60) | |
fail ArgumentError unless objects&.any? | |
fail ArgumentError unless cooloff&.kind_of?(Integer) | |
super() | |
@object_states = objects.map do |object| | |
ObjectState.new(object) | |
end | |
@cooloff = cooloff | |
end | |
## | |
# Yields the block with a value, prioritizing values | |
# whose use has not recently errored, that are currently | |
# less concurrently-used. | |
# | |
# @yieldparam value [Object] | |
def select(&block) | |
selected = synchronize { pick_one.tap(&:increment) } | |
yield(selected.value) | |
rescue | |
synchronize { selected.mark_error } | |
raise | |
ensure | |
synchronize { selected.decrement } | |
end | |
private | |
def pick_one | |
threshold = Time.now.to_i - @cooloff | |
@object_states.sort_by do |object_state| | |
[ | |
[object_state.last_error, threshold].max, # deprioritize recent errors | |
object_state.concurrent, # deprioritize high concurrency | |
object_state.last_start # deprioritize recent use | |
] | |
end.first | |
end | |
## | |
# @api private | |
class ObjectState | |
def initialize(object) | |
@value = object | |
@concurrent = 0 | |
@last_error = 0 | |
@last_start = 0 | |
end | |
attr_reader :value | |
attr_reader :concurrent | |
attr_reader :last_start | |
attr_reader :last_error | |
def increment | |
@concurrent += 1 | |
@last_start = Time.now.to_f | |
end | |
def decrement | |
@concurrent -= 1 | |
end | |
def mark_error | |
@last_error = Time.now.to_i | |
end | |
end | |
private_constant :ObjectState | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment