Last active
February 17, 2021 12:55
-
-
Save miry/569a56ce0700455f03e581fad36b1f0e to your computer and use it in GitHub Desktop.
Gracefull sidekiq worker killer for kubenernetes clusters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'fileutils' | |
require 'open3' | |
# Usage: | |
# | |
# Sidekiq.configure_server do |config| | |
# config.server_middleware do |chain| | |
# chain.add MemoryKillerMiddleware if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] | |
# end | |
# end | |
# | |
class MemoryKillerMiddleware | |
# Ported from: https://github.com/gitlabhq/gitlabhq/blob/95f630daeb5eec330080095786f7ac6702ebcc3f/lib/gitlab/sidekiq_middleware/memory_killer.rb | |
# Default the RSS limit to 0, meaning the MemoryKiller is disabled | |
MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i | |
# Give Sidekiq 15 minutes of grace time after exceeding the RSS limit | |
GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i | |
# Wait 30 seconds for running jobs to finish during graceful shutdown | |
SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i | |
SHUTDOWN_SIGNAL = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_SIGNAL'] || 'SIGKILL').to_s | |
# Create a mutex used to ensure there will be only one thread waiting to | |
# shut Sidekiq down | |
MUTEX = Mutex.new | |
def call(worker, job, _queue) | |
yield | |
current_rss = get_rss | |
Sidekiq.logger.debug({middleware: :memory_killer, current_rss: current_rss, max_rss: MAX_RSS}) | |
return unless MAX_RSS > 0 && current_rss > MAX_RSS | |
Thread.new do | |
# Return if another thread is already waiting to shut Sidekiq down | |
return unless MUTEX.try_lock | |
Sidekiq.logger.warn "current RSS #{current_rss} exceeds maximum RSS "\ | |
"#{MAX_RSS}" | |
Sidekiq.logger.warn "this thread will shut down PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}"\ | |
"in #{GRACE_TIME} seconds" | |
sleep(GRACE_TIME) | |
Sidekiq.logger.warn "sending SIGTERM to PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}" | |
Process.kill('SIGTERM', Process.pid) | |
Sidekiq.logger.warn "waiting #{SHUTDOWN_WAIT} seconds before sending "\ | |
"#{SHUTDOWN_SIGNAL} to PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}" | |
sleep(SHUTDOWN_WAIT) | |
Sidekiq.logger.warn "sending #{SHUTDOWN_SIGNAL} to PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}" | |
Sidekiq.logger.debug({message: 'going to kill it'}) | |
Process.kill(SHUTDOWN_SIGNAL, Process.pid) | |
end | |
end | |
private | |
def get_rss | |
output, status = popen(%W(ps -o rss= -p #{Process.pid})) | |
return 0 unless status.zero? | |
output.to_i | |
end | |
def popen(cmd, path = nil, vars = {}) | |
unless cmd.is_a?(Array) | |
raise "System commands must be given as an array of strings" | |
end | |
path ||= Dir.pwd | |
vars['PWD'] = path | |
options = { chdir: path } | |
FileUtils.mkdir_p(path) unless File.directory?(path) | |
cmd_output = "" | |
cmd_status = 0 | |
Open3.popen3(vars, *cmd, options) do |stdin, stdout, stderr, wait_thr| | |
yield(stdin) if block_given? | |
stdin.close | |
cmd_output << stdout.read | |
cmd_output << stderr.read | |
cmd_status = wait_thr.value.exitstatus | |
end | |
[cmd_output, cmd_status] | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment