Created
August 31, 2009 05:35
-
-
Save rbranson/178302 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/ruby | |
| # | |
| # This benchmark test implements a simple threaded message queue writer backed | |
| # by an SQLite3 database. It is modeled after a Twitter-like queue-per-destination | |
| # message system which has N number of actors, each with a queue. The idea | |
| # is to demonstrate a safe, disk-backed queueing system with a large number of | |
| # queues. All of the queues are stored in a separate table in the database file. | |
| # | |
| # In 2009, by Rick Branson, Released into Public Domain. | |
| # | |
| RANDOM_TEXT_BODY_SIZE = 100000 | |
| SOURCE_TEXT_MAX = 16 | |
| MSG_TEXT_MAX = 140 | |
| # go ahead and commit a write when the queue gets to... | |
| WRITE_QUEUE_BACKLOG = 250 | |
| # go ahead and commit when we haven't for N seconds... | |
| WRITE_QUEUE_SLEEP_TIME = 1 | |
| # this is the maximum number of messages that will be written to disk at a time | |
| WRITE_QUEUE_MSG_DISK_BACKLOG = 25000 | |
| require "rubygems" | |
| require "sqlite3" | |
| require "benchmark" | |
| require "thread" | |
| if ARGV.size < 3 | |
| puts "#{__FILE__} <database> <user count> <msg count>" | |
| exit | |
| end | |
| $dbfile, $tcount, $mcount = ARGV | |
| File.unlink($dbfile) rescue nil | |
| class BackgroundMessageDatabase | |
| def initialize(file) | |
| @db = SQLite3::Database.new(file) | |
| @db.execute("PRAGMA synchronous = FULL;") | |
| @db.execute("PRAGMA journal_mode = TRUNCATE;") | |
| @queue = Queue.new | |
| @backlog = 0 | |
| self.start | |
| end | |
| def write(dst, src, msg) | |
| @queue << [ dst, src, msg ] | |
| @backlog += 1 | |
| # we could check the Queue.size but that would hit a mutex | |
| # and be slow. this is kind of just for general purpose | |
| # prodding of the background thread anyway and @backlog is | |
| # not meant to be an accurate count of the number of messages | |
| # waiting on the background writer. | |
| if !@finishing and @backlog > WRITE_QUEUE_BACKLOG | |
| if @thread.status | |
| @thread.run | |
| end | |
| @backlog = 0 | |
| end | |
| end | |
| # WARNING: this is usually ran from inside another thread | |
| def work_queue | |
| if !@queue.empty? | |
| puts "background: queue has things." | |
| begin | |
| in_flight = 0 | |
| @db.transaction | |
| while !@queue.empty? | |
| dst, src, bod = @queue.pop | |
| insert = Proc.new { @db.execute("INSERT INTO #{dst} VALUES(?, ?)", src, bod) } | |
| begin | |
| insert.call | |
| rescue SQLite3::SQLException | |
| @db.execute("CREATE TABLE #{dst} (source varchar, msg varchar)") | |
| insert.call | |
| end | |
| in_flight += 1 | |
| if in_flight >= WRITE_QUEUE_MSG_DISK_BACKLOG | |
| puts "background: WRITE_QUEUE_MSG_DISK_BACKLOG reached, writing #{in_flight} messages..." | |
| @db.commit | |
| @db.transaction | |
| in_flight = 0 | |
| end | |
| end | |
| rescue Exception => e | |
| puts "#{e.inspect}" | |
| ensure | |
| @db.commit | |
| end | |
| end | |
| end | |
| def start | |
| @thread = Thread.new do | |
| loop do | |
| work_queue | |
| Thread.exit if @finishing | |
| puts "background: zZzZ..." | |
| sleep(WRITE_QUEUE_SLEEP_TIME) | |
| end | |
| end | |
| end | |
| def finish | |
| puts "writer: finishing..." | |
| @finishing = true | |
| if @thread.status | |
| if @thread.status == "sleeping" | |
| puts "writer: background thread is sleeping, terminating..." | |
| @thread.terminate | |
| else | |
| puts "writer: background thread is still writing, waiting..." | |
| @thread.run rescue nil # rescue us from a lovely thread edge condition | |
| @thread.join | |
| end | |
| end | |
| puts "writer: flushing remainder of queue..." | |
| work_queue | |
| end | |
| end | |
| bmdb = BackgroundMessageDatabase.new($dbfile) | |
| ########################################## | |
| puts "stress: generating #{RANDOM_TEXT_BODY_SIZE} byte body of text..." | |
| randsrc = [('a'..'z'),('A'..'Z')].map{|i| i.to_a}.flatten; | |
| randstring = (0...RANDOM_TEXT_BODY_SIZE).map{ randsrc[rand(randsrc.length)] }.join | |
| ########################################## | |
| puts "stress: building user list..." | |
| userlist = (1..$tcount.to_i).map { randstring[rand(RANDOM_TEXT_BODY_SIZE) % (RANDOM_TEXT_BODY_SIZE - SOURCE_TEXT_MAX), SOURCE_TEXT_MAX] } | |
| ######################################### | |
| ######################################### | |
| puts "stress: writing messages..." | |
| bm = Benchmark.measure do | |
| (0..$mcount.to_i).each do |n| | |
| dst = userlist[rand($tcount.to_i)] | |
| src = userlist[rand($tcount.to_i)] | |
| bod = randstring[rand(RANDOM_TEXT_BODY_SIZE) % (RANDOM_TEXT_BODY_SIZE - MSG_TEXT_MAX), 1 + rand(MSG_TEXT_MAX - 1)] | |
| bmdb.write(dst, src, bod) | |
| end | |
| puts "stress: done writing messages, waiting for write queue to catch up..." | |
| bmdb.finish | |
| end | |
| puts "performance: #{$mcount.to_f / bm.real} msg/sec" | |
| ######################################### |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment