Skip to content

Instantly share code, notes, and snippets.

@miry
Last active February 17, 2021 12:55
Show Gist options
  • Save miry/569a56ce0700455f03e581fad36b1f0e to your computer and use it in GitHub Desktop.
Save miry/569a56ce0700455f03e581fad36b1f0e to your computer and use it in GitHub Desktop.
Gracefull sidekiq worker killer for kubenernetes clusters
require 'fileutils'
require 'open3'
# Usage:
#
# Sidekiq.configure_server do |config|
# config.server_middleware do |chain|
# chain.add MemoryKillerMiddleware if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
# end
# end
#
class MemoryKillerMiddleware
# Ported from: https://github.com/gitlabhq/gitlabhq/blob/95f630daeb5eec330080095786f7ac6702ebcc3f/lib/gitlab/sidekiq_middleware/memory_killer.rb
# Default the RSS limit to 0, meaning the MemoryKiller is disabled
MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
# Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
# Wait 30 seconds for running jobs to finish during graceful shutdown
SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i
SHUTDOWN_SIGNAL = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_SIGNAL'] || 'SIGKILL').to_s
# Create a mutex used to ensure there will be only one thread waiting to
# shut Sidekiq down
MUTEX = Mutex.new
def call(worker, job, _queue)
yield
current_rss = get_rss
Sidekiq.logger.debug({middleware: :memory_killer, current_rss: current_rss, max_rss: MAX_RSS})
return unless MAX_RSS > 0 && current_rss > MAX_RSS
Thread.new do
# Return if another thread is already waiting to shut Sidekiq down
return unless MUTEX.try_lock
Sidekiq.logger.warn "current RSS #{current_rss} exceeds maximum RSS "\
"#{MAX_RSS}"
Sidekiq.logger.warn "this thread will shut down PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}"\
"in #{GRACE_TIME} seconds"
sleep(GRACE_TIME)
Sidekiq.logger.warn "sending SIGTERM to PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}"
Process.kill('SIGTERM', Process.pid)
Sidekiq.logger.warn "waiting #{SHUTDOWN_WAIT} seconds before sending "\
"#{SHUTDOWN_SIGNAL} to PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}"
sleep(SHUTDOWN_WAIT)
Sidekiq.logger.warn "sending #{SHUTDOWN_SIGNAL} to PID #{Process.pid} - Worker #{worker.class} - JID-#{job['jid']}"
Sidekiq.logger.debug({message: 'going to kill it'})
Process.kill(SHUTDOWN_SIGNAL, Process.pid)
end
end
private
def get_rss
output, status = popen(%W(ps -o rss= -p #{Process.pid}))
return 0 unless status.zero?
output.to_i
end
def popen(cmd, path = nil, vars = {})
unless cmd.is_a?(Array)
raise "System commands must be given as an array of strings"
end
path ||= Dir.pwd
vars['PWD'] = path
options = { chdir: path }
FileUtils.mkdir_p(path) unless File.directory?(path)
cmd_output = ""
cmd_status = 0
Open3.popen3(vars, *cmd, options) do |stdin, stdout, stderr, wait_thr|
yield(stdin) if block_given?
stdin.close
cmd_output << stdout.read
cmd_output << stderr.read
cmd_status = wait_thr.value.exitstatus
end
[cmd_output, cmd_status]
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment