Skip to content

Instantly share code, notes, and snippets.

@JustinLove
Created September 28, 2012 01:17
Show Gist options
  • Save JustinLove/3797439 to your computer and use it in GitHub Desktop.
Save JustinLove/3797439 to your computer and use it in GitHub Desktop.
Pieces of a Sidekiq Herkou autoscaler
require 'heroku'
module Background
class HerokuScaler
def initialize(
type = 'worker',
user = ENV['HEROKU_USER'],
pass = ENV['HEROKU_PASS'],
app = ENV['HEROKU_APP'])
@client = Heroku::Client.new(user, pass)
@type = type
@app = app
@workers = 0
@known = Time.now - 1
end
attr_reader :app
attr_reader :type
def workers
if known?
@workers
else
know client.ps(app).count {|ps| ps['process'].match /#{type}\.\d?/ }
end
end
def workers=(n)
if n != @workers || !known?
p "Scaling #{type} to #{n}"
client.ps_scale(app, :type => type, :qty => n)
know n
end
end
private
attr_reader :client
def know(n)
@known = Time.now + 5
@workers = n
end
def known?
Time.now < @known
end
end
end
require 'spec_helper'
require 'background/heroku_scaler'
describe Background::HerokuScaler, :online => true do
let(:cut) {Background::HerokuScaler}
let(:client) {cut.new}
subject {client}
its(:workers) {should == 0}
describe 'scaled' do
around do |example|
client.workers = 1
example.yield
client.workers = 0
end
its(:workers) {should == 1}
end
end
require 'securerandom' # bug in Sidekiq as of 2.2.1
require 'sidekiq'
module Background
module SidekiqAutoscaler
class Client
def initialize(scalers)
@scalers = scalers
end
def call(worker_class, item, queue)
@scalers[queue] && @scalers[queue].workers = 1
yield
end
end
class Server
def initialize(scaler, timeout)
@scaler = scaler
@timeout = timeout
end
def call(worker, msg, queue)
working!
yield
ensure
working!
wait_for_task_or_scale
end
private
def registered_queues
Sidekiq.redis { |x| x.smembers('queues') }
end
def empty?(name)
Sidekiq.redis { |conn| conn.llen("queue:#{name}") == 0 }
end
def pending_work?
registered_queues.any? {|q| !empty?(q)}
end
def wait_for_task_or_scale
loop do
return if pending_work?
return @scaler.workers = 0 if idle?
sleep(0.5)
end
end
def working!
Sidekiq.redis {|c| c.set('background_activity', Time.now)}
end
def idle_time
Sidekiq.redis {|c|
t = c.get('background_activity')
return 0 unless t
Time.now - Time.parse(t)
}
end
def idle?
idle_time > @timeout
end
end
end
end
require 'spec_helper'
require 'background/sidekiq_autoscaler'
class Scaler
attr_accessor :workers
def initialize(n = 0)
self.workers = n
end
end
describe Background::SidekiqAutoscaler do
let(:scaler) do
Scaler.new(workers)
end
describe Background::SidekiqAutoscaler::Client do
let(:cut) {Background::SidekiqAutoscaler::Client}
let(:sa) {cut.new('queue' => scaler)}
let(:workers) {0}
describe 'scales' do
before {sa.call(Class, {}, 'queue') {}}
subject {scaler.workers}
it {should == 1}
end
describe 'yields' do
it {sa.call(Class, {}, 'queue') {:foo}.should == :foo}
end
end
describe Background::SidekiqAutoscaler::Server do
let(:cut) {Background::SidekiqAutoscaler::Server}
let(:sa) {cut.new(scaler, 0)}
let(:workers) {1}
describe 'scales' do
before{sa.call(Object.new, {}, 'queue') {}}
subject {scaler.workers}
it {should == 0}
end
describe 'yields' do
it {sa.call(Object.new, {}, 'queue') {:foo}.should == :foo}
end
end
end
require 'securerandom' # bug in Sidekiq as of 2.2.1
require 'sidekiq'
require 'background/sidekiq_autoscaler'
require 'background/heroku_scaler'
redis = ENV['REDIS_URL'] || ENV['REDISTOGO_URL']
# this is a little fancy due to handling multiple process types
heroku = nil
if ENV['HEROKU_APP']
heroku = {}
scaleable = %w[default import] - (ENV['ALWAYS'] || '').split(' ')
scaleable.each do |queue|
heroku[queue] = Background::HerokuScaler.new(
queue,
ENV['HEROKU_USER'],
ENV['HEROKU_PASS'],
ENV['HEROKU_APP'])
end
end
Sidekiq.configure_client do |config|
config.redis = { :size => 1, :url => redis}
if heroku
config.client_middleware do |chain|
chain.add Background::SidekiqAutoscaler::Client, heroku
end
end
end
Sidekiq.configure_server do |config|
config.redis = ConnectionPool.new(:timeout => 4, :size => 4) do
Redis.connect(:url => redis)
end
config.server_middleware do |chain|
if heroku && ENV['HEROKU_PROCESS'] && heroku[ENV['HEROKU_PROCESS']]
p "Setting up auto-scaledown"
chain.add(Background::SidekiqAutoscaler::Server, heroku[ENV['HEROKU_PROCESS']], 60)
else
p "Not scaleable"
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment