Skip to content

Instantly share code, notes, and snippets.

@rbranson
Created August 31, 2009 05:35
Show Gist options
  • Select an option

  • Save rbranson/178302 to your computer and use it in GitHub Desktop.

Select an option

Save rbranson/178302 to your computer and use it in GitHub Desktop.
#!/usr/bin/ruby
#
# This benchmark test implements a simple threaded message queue writer backed
# by an SQLite3 database. It is modeled after a Twitter-like queue-per-destination
# message system which has N number of actors, each with a queue. The idea
# is to demonstrate a safe, disk-backed queueing system with a large number of
# queues. All of the queues are stored in a separate table in the database file.
#
# In 2009, by Rick Branson, Released into Public Domain.
#
RANDOM_TEXT_BODY_SIZE = 100000
SOURCE_TEXT_MAX = 16
MSG_TEXT_MAX = 140
# go ahead and commit a write when the queue gets to...
WRITE_QUEUE_BACKLOG = 250
# go ahead and commit when we haven't for N seconds...
WRITE_QUEUE_SLEEP_TIME = 1
# this is the maximum number of messages that will be written to disk at a time
WRITE_QUEUE_MSG_DISK_BACKLOG = 25000
require "rubygems"
require "sqlite3"
require "benchmark"
require "thread"
if ARGV.size < 3
puts "#{__FILE__} <database> <user count> <msg count>"
exit
end
$dbfile, $tcount, $mcount = ARGV
File.unlink($dbfile) rescue nil
class BackgroundMessageDatabase
def initialize(file)
@db = SQLite3::Database.new(file)
@db.execute("PRAGMA synchronous = FULL;")
@db.execute("PRAGMA journal_mode = TRUNCATE;")
@queue = Queue.new
@backlog = 0
self.start
end
def write(dst, src, msg)
@queue << [ dst, src, msg ]
@backlog += 1
# we could check the Queue.size but that would hit a mutex
# and be slow. this is kind of just for general purpose
# prodding of the background thread anyway and @backlog is
# not meant to be an accurate count of the number of messages
# waiting on the background writer.
if !@finishing and @backlog > WRITE_QUEUE_BACKLOG
if @thread.status
@thread.run
end
@backlog = 0
end
end
# WARNING: this is usually ran from inside another thread
def work_queue
if !@queue.empty?
puts "background: queue has things."
begin
in_flight = 0
@db.transaction
while !@queue.empty?
dst, src, bod = @queue.pop
insert = Proc.new { @db.execute("INSERT INTO #{dst} VALUES(?, ?)", src, bod) }
begin
insert.call
rescue SQLite3::SQLException
@db.execute("CREATE TABLE #{dst} (source varchar, msg varchar)")
insert.call
end
in_flight += 1
if in_flight >= WRITE_QUEUE_MSG_DISK_BACKLOG
puts "background: WRITE_QUEUE_MSG_DISK_BACKLOG reached, writing #{in_flight} messages..."
@db.commit
@db.transaction
in_flight = 0
end
end
rescue Exception => e
puts "#{e.inspect}"
ensure
@db.commit
end
end
end
def start
@thread = Thread.new do
loop do
work_queue
Thread.exit if @finishing
puts "background: zZzZ..."
sleep(WRITE_QUEUE_SLEEP_TIME)
end
end
end
def finish
puts "writer: finishing..."
@finishing = true
if @thread.status
if @thread.status == "sleeping"
puts "writer: background thread is sleeping, terminating..."
@thread.terminate
else
puts "writer: background thread is still writing, waiting..."
@thread.run rescue nil # rescue us from a lovely thread edge condition
@thread.join
end
end
puts "writer: flushing remainder of queue..."
work_queue
end
end
bmdb = BackgroundMessageDatabase.new($dbfile)
##########################################
puts "stress: generating #{RANDOM_TEXT_BODY_SIZE} byte body of text..."
randsrc = [('a'..'z'),('A'..'Z')].map{|i| i.to_a}.flatten;
randstring = (0...RANDOM_TEXT_BODY_SIZE).map{ randsrc[rand(randsrc.length)] }.join
##########################################
puts "stress: building user list..."
userlist = (1..$tcount.to_i).map { randstring[rand(RANDOM_TEXT_BODY_SIZE) % (RANDOM_TEXT_BODY_SIZE - SOURCE_TEXT_MAX), SOURCE_TEXT_MAX] }
#########################################
#########################################
puts "stress: writing messages..."
bm = Benchmark.measure do
(0..$mcount.to_i).each do |n|
dst = userlist[rand($tcount.to_i)]
src = userlist[rand($tcount.to_i)]
bod = randstring[rand(RANDOM_TEXT_BODY_SIZE) % (RANDOM_TEXT_BODY_SIZE - MSG_TEXT_MAX), 1 + rand(MSG_TEXT_MAX - 1)]
bmdb.write(dst, src, bod)
end
puts "stress: done writing messages, waiting for write queue to catch up..."
bmdb.finish
end
puts "performance: #{$mcount.to_f / bm.real} msg/sec"
#########################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment