Skip to content

Instantly share code, notes, and snippets.

@vipulnsward
Created August 8, 2025 11:59
Show Gist options
  • Save vipulnsward/bd4c2a699594490584a7fe22d60337c8 to your computer and use it in GitHub Desktop.
Save vipulnsward/bd4c2a699594490584a7fe22d60337c8 to your computer and use it in GitHub Desktop.
Efficient Rate Limiting and Coalescing for Background Jobs - Sidekiq, Shoryuken, and more

Efficient Rate Limiting and Coalescing for Background Jobs

Table of Contents

Sidekiq with Lua Scripts

Atomic Rate Limiting with Coalescing

# app/jobs/concerns/efficient_rate_limiter.rb
module EfficientRateLimiter
  extend ActiveSupport::Concern
  
  # Lua script for atomic rate limit + coalesce check
  RATE_LIMIT_COALESCE_SCRIPT = <<~LUA
    local rate_key = KEYS[1]
    local coalesce_key = KEYS[2]
    local limit = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])
    local project_id = ARGV[3]
    local current_time = tonumber(ARGV[4])
    local job_args = ARGV[5]
    
    -- Check coalesce: if job already queued for this project, skip
    local existing = redis.call('GET', coalesce_key)
    if existing then
      return {0, 'coalesced'}
    end
    
    -- Check rate limit using sliding window
    local window_start = current_time - window
    
    -- Remove old entries outside window
    redis.call('ZREMRANGEBYSCORE', rate_key, 0, window_start)
    
    -- Count current entries in window
    local current_count = redis.call('ZCARD', rate_key)
    
    if current_count >= limit then
      return {0, 'rate_limited'}
    end
    
    -- Add to rate limit window
    redis.call('ZADD', rate_key, current_time, current_time .. ':' .. project_id)
    redis.call('EXPIRE', rate_key, window + 60)
    
    -- Set coalesce key (expires after 60 seconds)
    redis.call('SETEX', coalesce_key, 60, job_args)
    
    return {1, 'ok'}
  LUA
  
  # SHA will be cached after first call
  SCRIPT_SHA = Digest::SHA1.hexdigest(RATE_LIMIT_COALESCE_SCRIPT)
  
  class_methods do
    def perform_async(*args)
      project_id = args[0] # Assuming first arg is project_id
      
      result = Sidekiq.redis do |conn|
        begin
          conn.evalsha(
            SCRIPT_SHA,
            keys: [rate_limit_key, coalesce_key(project_id)],
            argv: [rate_limit, rate_window, project_id, Time.now.to_f, args.to_json]
          )
        rescue Redis::CommandError => e
          if e.message.include?('NOSCRIPT')
            # Load script if not cached
            conn.eval(
              RATE_LIMIT_COALESCE_SCRIPT,
              keys: [rate_limit_key, coalesce_key(project_id)],
              argv: [rate_limit, rate_window, project_id, Time.now.to_f, args.to_json]
            )
          else
            raise
          end
        end
      end
      
      if result[0] == 1
        # Proceed with job
        super
      else
        case result[1]
        when 'rate_limited'
          # Requeue with exponential backoff
          perform_in(calculate_backoff, *args)
        when 'coalesced'
          # Job already queued for this project, skip
          nil
        end
      end
    end
    
    def rate_limit_key
      @rate_limit_key ||= "rl:#{name}"
    end
    
    def coalesce_key(project_id)
      "coalesce:#{name}:#{project_id}"
    end
    
    def rate_limit
      @rate_limit || 5
    end
    
    def rate_window
      @rate_window || 60
    end
    
    def set_rate_limit(limit: 5, window: 60)
      @rate_limit = limit
      @rate_window = window
    end
    
    private
    
    def calculate_backoff
      [30 * (1.5 ** (@retry_count || 0)), 300].min
    end
  end
  
  def perform(*args)
    project_id = args[0]
    
    # Clean up coalesce key when job starts
    Sidekiq.redis { |c| c.del(self.class.coalesce_key(project_id)) }
    
    # Get latest state and perform work
    execute_with_latest_state(*args)
  ensure
    # Ensure coalesce key is cleaned up
    Sidekiq.redis { |c| c.del(self.class.coalesce_key(project_id)) }
  end
end

Implementation Example

# app/jobs/dot_project_metrics_job.rb
class DOTProjectMetricsJob
  include Sidekiq::Worker
  include EfficientRateLimiter
  
  set_rate_limit limit: 5, window: 60  # 5 jobs per minute
  
  def execute_with_latest_state(project_id, triggered_at = nil)
    # Always fetch latest project state
    project = DOTProject.find(project_id)
    
    # Skip if recently calculated
    return if project.metrics_calculated_at && project.metrics_calculated_at > 1.minute.ago
    
    # Heavy computation
    metrics = calculate_metrics(project)
    
    # Store results
    project.update\!(
      metrics: metrics,
      metrics_calculated_at: Time.current
    )
  end
  
  private
  
  def calculate_metrics(project)
    # Your expensive calculation
    DOTProjectMetricsService.new(project).calculate
  end
end

Ultra-Efficient with Memory Cache

# app/jobs/concerns/ultra_efficient_limiter.rb
module UltraEfficientLimiter
  extend ActiveSupport::Concern
  
  # Thread-safe in-memory cache
  MEMORY_CACHE = Concurrent::Hash.new
  
  class_methods do
    def perform_async(*args)
      project_id = args[0]
      cache_key = "#{name}:#{project_id}"
      
      # First check memory cache (fastest)
      if MEMORY_CACHE[cache_key] && MEMORY_CACHE[cache_key] > Time.now.to_f
        return nil  # Skip - already queued
      end
      
      # Single Redis call for both checks
      rate_key = "rl:#{name}"
      coalesce_key = "co:#{name}:#{project_id}"
      
      can_proceed = Sidekiq.redis do |conn|
        conn.multi do |multi|
          # Check and set coalesce
          multi.set(coalesce_key, 1, nx: true, ex: 60)
          # Increment rate counter
          multi.incr(rate_key)
          multi.expire(rate_key, rate_window)
        end
      end
      
      if can_proceed[0] && can_proceed[1] <= rate_limit
        # Update memory cache
        MEMORY_CACHE[cache_key] = Time.now.to_f + 60
        
        # Clean old cache entries periodically
        clean_memory_cache if rand(100) == 0
        
        super
      elsif \!can_proceed[0]
        nil  # Already coalesced
      else
        # Rate limited - decrement counter and requeue
        Sidekiq.redis { |c| c.decr(rate_key) }
        perform_in(30 + rand(30), *args)
      end
    end
    
    def rate_limit
      @rate_limit || 5
    end
    
    def rate_window
      @rate_window || 60
    end
    
    private
    
    def clean_memory_cache
      now = Time.now.to_f
      MEMORY_CACHE.delete_if { |_, expiry| expiry < now }
    end
  end
end

Shoryuken (AWS SQS)

# Gemfile
gem 'shoryuken'

# app/workers/dot_project_metrics_worker.rb
class DOTProjectMetricsWorker
  include Shoryuken::Worker
  
  # SQS native features
  shoryuken_options(
    queue: 'metrics',
    auto_delete: true,
    body_parser: :json,
    # SQS Message deduplication (built-in\!)
    message_deduplication_id: ->(msg) { msg[:project_id] },
    # Content-based deduplication
    message_group_id: ->(msg) { "project_#{msg[:project_id]}" }
  )
  
  def perform(sqs_msg, body)
    project_id = body['project_id']
    
    # SQS visibility timeout acts as natural rate limiter
    # If job takes too long, message becomes visible again
    
    # Use DynamoDB for distributed rate limiting (more scalable than Redis)
    return requeue(sqs_msg) unless check_rate_limit(project_id)
    
    calculate_metrics(project_id)
  end
  
  private
  
  def check_rate_limit(project_id)
    # Use DynamoDB atomic counters
    dynamodb = Aws::DynamoDB::Client.new
    
    begin
      dynamodb.update_item(
        table_name: 'rate_limits',
        key: { 'id' => "metrics:#{project_id}" },
        update_expression: 'ADD #cnt :val SET #exp = :exp',
        expression_attribute_names: {
          '#cnt' => 'count',
          '#exp' => 'expires'
        },
        expression_attribute_values: {
          ':val' => 1,
          ':exp' => Time.now.to_i + 60,
          ':limit' => 5,
          ':now' => Time.now.to_i
        },
        condition_expression: '#cnt < :limit OR #exp < :now',
        return_values: 'ALL_NEW'
      )
      true
    rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
      false
    end
  end
  
  def requeue(sqs_msg)
    # Change visibility timeout (SQS native backoff)
    sqs_msg.change_visibility_timeout(30 + rand(30))
  end
end

# config/shoryuken.yml
concurrency: 10
delay: 0
queues:
  - [metrics, 1]  # Low priority for rate-limited jobs
aws:
  region: us-east-1
  # Enable long polling
  receive_message:
    wait_time_seconds: 20
    max_number_of_messages: 10

Other Queue Systems

Que (PostgreSQL-based)

# Uses PostgreSQL advisory locks - very efficient
class DOTProjectMetricsJob < Que::Job
  # Built-in rate limiting using advisory locks
  def run(project_id)
    # PostgreSQL advisory lock prevents duplicates
    Que.execute <<-SQL
      INSERT INTO que_jobs (job_class, args, run_at)
      VALUES ($1, $2, $3)
      ON CONFLICT (job_class, args) 
      WHERE run_at > NOW() - INTERVAL '1 minute'
      DO NOTHING
    SQL
    
    calculate_metrics(project_id)
  end
end

Sneakers (RabbitMQ-based)

class DOTProjectMetricsWorker
  include Sneakers::Worker
  
  from_queue 'metrics',
    # RabbitMQ TTL for deduplication
    arguments: {
      'x-message-ttl' => 60000,
      'x-max-length' => 100,
      'x-single-active-consumer' => true  # Natural rate limiting
    }
    
  def work(msg)
    data = JSON.parse(msg)
    project_id = data['project_id']
    
    # RabbitMQ deduplication plugin
    metadata[:headers]['x-deduplication-header'] = project_id
    
    calculate_metrics(project_id)
    ack\!
  end
end

Faktory

# Gemfile
gem 'faktory_worker_ruby'

# Job with native throttling
class DOTProjectMetricsJob
  include Faktory::Job
  
  # Native per-job throttling\!
  faktory_options throttle: {
    key: "dot_metrics",
    limit: 5,
    period: 60,
    strategy: :skip  # or :retry
  }
  
  def perform(project_id)
    # Your work
    calculate_metrics(project_id)
  end
end

Production Ready Setup

# app/jobs/dot_project_metrics_job.rb
class DOTProjectMetricsJob
  include Sidekiq::Worker
  include EfficientRateLimiter
  
  sidekiq_options queue: :metrics, retry: 3
  set_rate_limit limit: 5, window: 60
  
  # Add circuit breaker for resilience
  def execute_with_latest_state(project_id, triggered_at = nil)
    return if recently_calculated?(project_id)
    
    Semian[project_id].acquire do
      project = DOTProject.find(project_id)
      
      # Use Rails.cache for results caching
      metrics = Rails.cache.fetch(
        "metrics:#{project_id}:#{project.updated_at.to_i}",
        expires_in: 5.minutes
      ) do
        calculate_metrics(project)
      end
      
      project.update_columns(
        metrics: metrics,
        metrics_calculated_at: Time.current
      )
    end
  end
  
  private
  
  def recently_calculated?(project_id)
    Rails.cache.read("metrics_calculated:#{project_id}").present?
  end
  
  def calculate_metrics(project)
    result = DOTProjectMetricsService.new(project).calculate
    
    # Set flag to prevent recalculation
    Rails.cache.write(
      "metrics_calculated:#{project.id}",
      true,
      expires_in: 1.minute
    )
    
    result
  end
end

Performance Comparison

Solution Overhead Deduplication Rate Limiting Pros Cons
Sidekiq + Lua Very Low (~0.5ms) Excellent Excellent Fast, atomic ops Complex setup
Sidekiq + Memory Lowest (~0.1ms) Good Good Fastest Memory cleanup needed
Shoryuken (SQS) Low (~2ms) Native Via visibility AWS native, scalable AWS costs
Que (PostgreSQL) Medium (~5ms) Native Via locks No Redis needed DB load
Sneakers (RabbitMQ) Low (~1ms) Plugin Queue-based Message guarantees Extra infrastructure
Faktory Low (~1ms) Good Native Built-in throttling Separate service

Key Benefits

This implementation provides:

  • ~0.5ms overhead per job check
  • Automatic coalescing (no duplicate work per project)
  • Efficient rate limiting (configurable per job class)
  • Resilient (circuit breaker + caching)
  • Observable (easy to add metrics)
  • No global locks (won't block other queues)

Installation

  1. Add the concern to your Rails app:

    mkdir -p app/jobs/concerns
    # Copy the efficient_rate_limiter.rb content
  2. Include in your job:

    class YourJob
      include Sidekiq::Worker
      include EfficientRateLimiter
      
      set_rate_limit limit: 10, window: 60
      
      def execute_with_latest_state(id)
        # Your job logic
      end
    end
  3. Monitor with Redis commands:

    redis-cli
    > KEYS rl:*  # See rate limit keys
    > KEYS coalesce:*  # See coalesced jobs

EOF < /dev/null

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment