- Sidekiq with Lua Scripts (Lowest Overhead)
- Ultra-Efficient with Memory Cache
- Shoryuken (AWS SQS)
- Other Queue Systems Comparison
- Production Ready Setup
# app/jobs/concerns/efficient_rate_limiter.rb
module EfficientRateLimiter
extend ActiveSupport::Concern
# Lua script for atomic rate limit + coalesce check
RATE_LIMIT_COALESCE_SCRIPT = <<~LUA
local rate_key = KEYS[1]
local coalesce_key = KEYS[2]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local project_id = ARGV[3]
local current_time = tonumber(ARGV[4])
local job_args = ARGV[5]
-- Check coalesce: if job already queued for this project, skip
local existing = redis.call('GET', coalesce_key)
if existing then
return {0, 'coalesced'}
end
-- Check rate limit using sliding window
local window_start = current_time - window
-- Remove old entries outside window
redis.call('ZREMRANGEBYSCORE', rate_key, 0, window_start)
-- Count current entries in window
local current_count = redis.call('ZCARD', rate_key)
if current_count >= limit then
return {0, 'rate_limited'}
end
-- Add to rate limit window
redis.call('ZADD', rate_key, current_time, current_time .. ':' .. project_id)
redis.call('EXPIRE', rate_key, window + 60)
-- Set coalesce key (expires after 60 seconds)
redis.call('SETEX', coalesce_key, 60, job_args)
return {1, 'ok'}
LUA
# SHA will be cached after first call
SCRIPT_SHA = Digest::SHA1.hexdigest(RATE_LIMIT_COALESCE_SCRIPT)
class_methods do
def perform_async(*args)
project_id = args[0] # Assuming first arg is project_id
result = Sidekiq.redis do |conn|
begin
conn.evalsha(
SCRIPT_SHA,
keys: [rate_limit_key, coalesce_key(project_id)],
argv: [rate_limit, rate_window, project_id, Time.now.to_f, args.to_json]
)
rescue Redis::CommandError => e
if e.message.include?('NOSCRIPT')
# Load script if not cached
conn.eval(
RATE_LIMIT_COALESCE_SCRIPT,
keys: [rate_limit_key, coalesce_key(project_id)],
argv: [rate_limit, rate_window, project_id, Time.now.to_f, args.to_json]
)
else
raise
end
end
end
if result[0] == 1
# Proceed with job
super
else
case result[1]
when 'rate_limited'
# Requeue with exponential backoff
perform_in(calculate_backoff, *args)
when 'coalesced'
# Job already queued for this project, skip
nil
end
end
end
def rate_limit_key
@rate_limit_key ||= "rl:#{name}"
end
def coalesce_key(project_id)
"coalesce:#{name}:#{project_id}"
end
def rate_limit
@rate_limit || 5
end
def rate_window
@rate_window || 60
end
def set_rate_limit(limit: 5, window: 60)
@rate_limit = limit
@rate_window = window
end
private
def calculate_backoff
[30 * (1.5 ** (@retry_count || 0)), 300].min
end
end
def perform(*args)
project_id = args[0]
# Clean up coalesce key when job starts
Sidekiq.redis { |c| c.del(self.class.coalesce_key(project_id)) }
# Get latest state and perform work
execute_with_latest_state(*args)
ensure
# Ensure coalesce key is cleaned up
Sidekiq.redis { |c| c.del(self.class.coalesce_key(project_id)) }
end
end
# app/jobs/dot_project_metrics_job.rb
class DOTProjectMetricsJob
include Sidekiq::Worker
include EfficientRateLimiter
set_rate_limit limit: 5, window: 60 # 5 jobs per minute
def execute_with_latest_state(project_id, triggered_at = nil)
# Always fetch latest project state
project = DOTProject.find(project_id)
# Skip if recently calculated
return if project.metrics_calculated_at && project.metrics_calculated_at > 1.minute.ago
# Heavy computation
metrics = calculate_metrics(project)
# Store results
project.update\!(
metrics: metrics,
metrics_calculated_at: Time.current
)
end
private
def calculate_metrics(project)
# Your expensive calculation
DOTProjectMetricsService.new(project).calculate
end
end
# app/jobs/concerns/ultra_efficient_limiter.rb
module UltraEfficientLimiter
extend ActiveSupport::Concern
# Thread-safe in-memory cache
MEMORY_CACHE = Concurrent::Hash.new
class_methods do
def perform_async(*args)
project_id = args[0]
cache_key = "#{name}:#{project_id}"
# First check memory cache (fastest)
if MEMORY_CACHE[cache_key] && MEMORY_CACHE[cache_key] > Time.now.to_f
return nil # Skip - already queued
end
# Single Redis call for both checks
rate_key = "rl:#{name}"
coalesce_key = "co:#{name}:#{project_id}"
can_proceed = Sidekiq.redis do |conn|
conn.multi do |multi|
# Check and set coalesce
multi.set(coalesce_key, 1, nx: true, ex: 60)
# Increment rate counter
multi.incr(rate_key)
multi.expire(rate_key, rate_window)
end
end
if can_proceed[0] && can_proceed[1] <= rate_limit
# Update memory cache
MEMORY_CACHE[cache_key] = Time.now.to_f + 60
# Clean old cache entries periodically
clean_memory_cache if rand(100) == 0
super
elsif \!can_proceed[0]
nil # Already coalesced
else
# Rate limited - decrement counter and requeue
Sidekiq.redis { |c| c.decr(rate_key) }
perform_in(30 + rand(30), *args)
end
end
def rate_limit
@rate_limit || 5
end
def rate_window
@rate_window || 60
end
private
def clean_memory_cache
now = Time.now.to_f
MEMORY_CACHE.delete_if { |_, expiry| expiry < now }
end
end
end
# Gemfile
gem 'shoryuken'
# app/workers/dot_project_metrics_worker.rb
class DOTProjectMetricsWorker
include Shoryuken::Worker
# SQS native features
shoryuken_options(
queue: 'metrics',
auto_delete: true,
body_parser: :json,
# SQS Message deduplication (built-in\!)
message_deduplication_id: ->(msg) { msg[:project_id] },
# Content-based deduplication
message_group_id: ->(msg) { "project_#{msg[:project_id]}" }
)
def perform(sqs_msg, body)
project_id = body['project_id']
# SQS visibility timeout acts as natural rate limiter
# If job takes too long, message becomes visible again
# Use DynamoDB for distributed rate limiting (more scalable than Redis)
return requeue(sqs_msg) unless check_rate_limit(project_id)
calculate_metrics(project_id)
end
private
def check_rate_limit(project_id)
# Use DynamoDB atomic counters
dynamodb = Aws::DynamoDB::Client.new
begin
dynamodb.update_item(
table_name: 'rate_limits',
key: { 'id' => "metrics:#{project_id}" },
update_expression: 'ADD #cnt :val SET #exp = :exp',
expression_attribute_names: {
'#cnt' => 'count',
'#exp' => 'expires'
},
expression_attribute_values: {
':val' => 1,
':exp' => Time.now.to_i + 60,
':limit' => 5,
':now' => Time.now.to_i
},
condition_expression: '#cnt < :limit OR #exp < :now',
return_values: 'ALL_NEW'
)
true
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
false
end
end
def requeue(sqs_msg)
# Change visibility timeout (SQS native backoff)
sqs_msg.change_visibility_timeout(30 + rand(30))
end
end
# config/shoryuken.yml
concurrency: 10
delay: 0
queues:
- [metrics, 1] # Low priority for rate-limited jobs
aws:
region: us-east-1
# Enable long polling
receive_message:
wait_time_seconds: 20
max_number_of_messages: 10
# Uses PostgreSQL advisory locks - very efficient
class DOTProjectMetricsJob < Que::Job
# Built-in rate limiting using advisory locks
def run(project_id)
# PostgreSQL advisory lock prevents duplicates
Que.execute <<-SQL
INSERT INTO que_jobs (job_class, args, run_at)
VALUES ($1, $2, $3)
ON CONFLICT (job_class, args)
WHERE run_at > NOW() - INTERVAL '1 minute'
DO NOTHING
SQL
calculate_metrics(project_id)
end
end
class DOTProjectMetricsWorker
include Sneakers::Worker
from_queue 'metrics',
# RabbitMQ TTL for deduplication
arguments: {
'x-message-ttl' => 60000,
'x-max-length' => 100,
'x-single-active-consumer' => true # Natural rate limiting
}
def work(msg)
data = JSON.parse(msg)
project_id = data['project_id']
# RabbitMQ deduplication plugin
metadata[:headers]['x-deduplication-header'] = project_id
calculate_metrics(project_id)
ack\!
end
end
# Gemfile
gem 'faktory_worker_ruby'
# Job with native throttling
class DOTProjectMetricsJob
include Faktory::Job
# Native per-job throttling\!
faktory_options throttle: {
key: "dot_metrics",
limit: 5,
period: 60,
strategy: :skip # or :retry
}
def perform(project_id)
# Your work
calculate_metrics(project_id)
end
end
# app/jobs/dot_project_metrics_job.rb
class DOTProjectMetricsJob
include Sidekiq::Worker
include EfficientRateLimiter
sidekiq_options queue: :metrics, retry: 3
set_rate_limit limit: 5, window: 60
# Add circuit breaker for resilience
def execute_with_latest_state(project_id, triggered_at = nil)
return if recently_calculated?(project_id)
Semian[project_id].acquire do
project = DOTProject.find(project_id)
# Use Rails.cache for results caching
metrics = Rails.cache.fetch(
"metrics:#{project_id}:#{project.updated_at.to_i}",
expires_in: 5.minutes
) do
calculate_metrics(project)
end
project.update_columns(
metrics: metrics,
metrics_calculated_at: Time.current
)
end
end
private
def recently_calculated?(project_id)
Rails.cache.read("metrics_calculated:#{project_id}").present?
end
def calculate_metrics(project)
result = DOTProjectMetricsService.new(project).calculate
# Set flag to prevent recalculation
Rails.cache.write(
"metrics_calculated:#{project.id}",
true,
expires_in: 1.minute
)
result
end
end
Solution | Overhead | Deduplication | Rate Limiting | Pros | Cons |
---|---|---|---|---|---|
Sidekiq + Lua | Very Low (~0.5ms) | Excellent | Excellent | Fast, atomic ops | Complex setup |
Sidekiq + Memory | Lowest (~0.1ms) | Good | Good | Fastest | Memory cleanup needed |
Shoryuken (SQS) | Low (~2ms) | Native | Via visibility | AWS native, scalable | AWS costs |
Que (PostgreSQL) | Medium (~5ms) | Native | Via locks | No Redis needed | DB load |
Sneakers (RabbitMQ) | Low (~1ms) | Plugin | Queue-based | Message guarantees | Extra infrastructure |
Faktory | Low (~1ms) | Good | Native | Built-in throttling | Separate service |
This implementation provides:
- ✅ ~0.5ms overhead per job check
- ✅ Automatic coalescing (no duplicate work per project)
- ✅ Efficient rate limiting (configurable per job class)
- ✅ Resilient (circuit breaker + caching)
- ✅ Observable (easy to add metrics)
- ✅ No global locks (won't block other queues)
-
Add the concern to your Rails app:
mkdir -p app/jobs/concerns # Copy the efficient_rate_limiter.rb content
-
Include in your job:
class YourJob include Sidekiq::Worker include EfficientRateLimiter set_rate_limit limit: 10, window: 60 def execute_with_latest_state(id) # Your job logic end end
-
Monitor with Redis commands:
redis-cli > KEYS rl:* # See rate limit keys > KEYS coalesce:* # See coalesced jobs
EOF < /dev/null