Skip to content

Instantly share code, notes, and snippets.

@mpraglowski
Created March 28, 2025 07:38
Show Gist options
  • Select an option

  • Save mpraglowski/5779da7cd3881e3210800df6fe905a05 to your computer and use it in GitHub Desktop.

Select an option

Save mpraglowski/5779da7cd3881e3210800df6fe905a05 to your computer and use it in GitHub Desktop.
Sample code for "Handling Concurrency with Database Locks and SKIP LOCKED" blog post
require 'bundler/inline'
gemfile true do
source 'https://rubygems.org'
gem 'activerecord'
gem 'mysql2'
gem 'ruby-progressbar'
gem 'irb'
end
require 'thread'
require 'benchmark'
require "ruby-progressbar"
require 'active_record'
require 'active_support'
SLEEP_FOR = 0.1
ActiveRecord::Base.establish_connection(adapter: 'mysql2', host: '127.0.0.1', database: 'inventory', pool: 100, init_command: 'SET SESSION innodb_lock_wait_timeout=1;')
puts ActiveRecord::Base.connection.execute("show variables like '%lock_wait%';").to_h.merge(isolation_level: ActiveSupport::IsolatedExecutionState.isolation_level).symbolize_keys
ActiveRecord::Base.logger = nil
ActiveRecord::Schema.define do
create_table :inventories, force: true do |t|
t.integer :product_id, null: false
t.integer :total, null: false
t.integer :available, null: false, default: 0
end
add_index :inventories, :product_id
create_table :inventory_items, force: true do |t|
t.integer :inventory_id, null: false
t.string :status, null: false, default: 'free'
end
add_index :inventory_items, [:inventory_id, :status]
end
OutOfStock = Class.new(StandardError)
class Inventory < ActiveRecord::Base
self.table_name = 'inventories'
end
class LockingInventory < Inventory
def reserve!(quantity)
with_lock do
sleep(SLEEP_FOR)
raise OutOfStock if self.available < quantity
self.available -= quantity
self.save!
end
end
def inspect
"For #{product_id}, available: #{available}, reserved: #{total-available}"
end
end
class NonLockingInventory < Inventory
has_many :items, class_name: 'InventoryItem', foreign_key: :inventory_id
def reserve!(quantity)
sleep(SLEEP_FOR)
items_to_take = self.items.where(status: 'free').lock('FOR UPDATE SKIP LOCKED').limit(quantity)
raise OutOfStock if items_to_take.length < quantity
items_to_take.update_all(status: 'reserved')
end
def inspect
stats = self.items.group(:status).count
"For #{product_id}, available: #{stats["free"] || 0}, reserved: #{stats["reserved"] || 0}"
end
end
class InventoryItem < ActiveRecord::Base
validates :status, inclusion: {in: %w[free reserved]}
end
def setup_inventory(limit)
puts "Setup inventories..."
InventoryItem.delete_all
Inventory.delete_all
stock = 100*limit
progress = ProgressBar.create(total: stock)
(1..stock).each do |i|
inventory = Inventory.create!(product_id: i, available: limit, total: limit)
InventoryItem.insert_all(limit.times.map { {inventory_id: inventory.id} })
progress.increment
end
puts "Done: Inventories created: #{Inventory.count} (and #{InventoryItem.count} items)"
end
def run(n_times, &block)
task_queue = Queue.new
n_times.times.each { |i| task_queue << i }
stats = Hash.new {|h,k| h[k] = 0}
workers = 100.times.map do
Thread.new do
until task_queue.empty?
task = task_queue.pop(true) rescue nil
block.call if task
stats[Thread.current.object_id] += 1
end
end
end
workers.each(&:join)
puts "Done #{stats.map{|k,v| v}.sum} requests using #{workers.count} workers, with ~#{stats.map{|k,v| v}.sum / workers.count} requests per worker"
end
def test(bm, inventory_class, limit, product_id = nil, items = 1)
requests = limit * 10
product_id ||= rand(1..100*limit)
inventory = inventory_class.find_by(product_id: product_id)
puts "Starting #{inventory_class}: #{requests} times trying to reserve product #{product_id}"
puts "Before #{inventory_class}: " + inventory.inspect
errors = Hash.new {|h,k| h[k] = 0}
bm.report("Using #{inventory_class}") do
run(requests) do
inventory.reserve!(items)
rescue => e
errors[e.class] += 1
end
end
puts "After #{inventory_class}: " + inventory.reload.inspect
puts errors.inspect
end
limit = (ARGV[0] || 100).to_i
setup_inventory(limit)
Benchmark.bm do |x|
test(x, LockingInventory, limit)
test(x, NonLockingInventory, limit)
end
@mpraglowski
Copy link
Copy Markdown
Author

mpraglowski commented Mar 28, 2025

The sleep is used here to "simulate" some work done in real systems and to make sure threads are switched.

Run: ruby inventory.rb 100

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment