Created
April 29, 2019 21:15
-
-
Save yaauie/d56515e41050e83fb3cc9692268055a7 to your computer and use it in GitHub Desktop.
proof-of-concept generic back-pressure provider, enables blocking back-pressure to be controlled outside the code that is performing the actions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The BackPressureProvider allows back-pressure to be applied to non-blocking APIs when those APIs also | |
# provide hooks for identifying when they _should_ block. | |
class BackPressureProvider | |
def initialize(desc, logger) | |
@desc = desc | |
@logger = logger | |
@mutex = Mutex.new | |
@cond = ConditionVariable.new | |
@back_pressure_engaged = false | |
@timeout_max = 10 | |
end | |
# Engages back-pressure; threads using `BackPressureProvider#execute` will be blocked until back-pressure | |
# is removed. | |
# | |
# @param reason [String]: the reason back-pressure is being applied, to be included in the log message. | |
# @return [void] | |
def engage_back_pressure(reason) | |
@mutex.synchronize do | |
@back_pressure_engaged = true | |
@logger.warn("#{@desc} back-pressure engaged: #{reason}") | |
end | |
end | |
# Removes back-pressure, waking any threads that were blocked while using `BackPressureProvider#execute`. | |
# | |
# @return [void] | |
def remove_back_pressure | |
@mutex.synchronize do | |
@back_pressure_engaged = false | |
@logger.info("#{@desc} back-pressure removed.") | |
@cond.broadcast # wakeup _all_ waiting threads | |
end | |
end | |
# Executes the provided block, _after_ waiting out any back-pressure. | |
# | |
# @yieldreturn [Object] the value returned from the block is returned by this method | |
# @return [Object] | |
def execute | |
if @back_pressure_engaged | |
timeout = 1 | |
start = Time.now | |
thread_id = Thread.current.__id__ | |
loop do | |
# instead of a blind sleep, wait for a notification with a timeout; this allows | |
# us to _immediately_ begin sending events when we are unblocked. | |
should_block = @mutex.synchronize do | |
@cond.wait(@mutex, @timeout) | |
@back_pressure_engaged | |
end | |
break unless should_block | |
block_duration = Time.now - start | |
@logger.warn("#{desc} has been blocked with no movement for #{block_duration.round}s... (#{thread_id})") | |
timeout = [@timeout_max, (timeout * 2)].min | |
end | |
end | |
yield | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment