Created
February 5, 2019 05:33
-
-
Save dmshvetsov/fd1ea8cfc96a908ca0e6fae298c0a7e3 to your computer and use it in GitHub Desktop.
Example of how we may use queue with threads to make efficient background worker.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Worker | |
def self.start(num_threads:, queue_size:) | |
queue = SizedQueue.new(queue_size) | |
worker = new(num_threads: num_threads, queue: queue) | |
worker.spawn_threads | |
worker | |
end | |
def initialize(num_threads:, queue:) | |
@num_threads = num_threads | |
@queue = queue | |
@threads = [] | |
end | |
attr_reader :num_threads, :threads | |
private :threads | |
def spawn_threads | |
num_threads.times do | |
threads << Thread.new do | |
while running? || actions? | |
action_proc, action_payload = wait_for_action | |
action_proc.call(action_payload) if action_proc | |
end | |
end | |
end | |
end | |
def enqueue(action, payload) | |
queue.push([action, payload]) | |
end | |
def stop | |
queue.close | |
threads.each(&:exit) | |
threads.clear | |
true | |
end | |
private | |
attr_reader :queue, :threads | |
def actions? | |
!queue.empty? | |
end | |
def running? | |
!queue.closed? | |
end | |
def dequeue_action | |
queue.pop(true) | |
end | |
def wait_for_action | |
queue.pop(false) | |
end | |
end |
Hi @SathiyarajS good catch. This method is used in the "production" version of the worker, take a look https://github.com/yabeda-rb/yabeda-datadog/blob/8c6e40f24038d4420be500c97032c38096f541e2/lib/yabeda/datadog/worker.rb
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @iamdidev, dequeue_action method is declared as private here. I do not see the usage of this method inside this class. Could you please explain the purpose of this method?