Created
April 15, 2011 14:03
-
-
Save mrzor/921749 to your computer and use it in GitHub Desktop.
a connection pool for eventmachine & mysql2 with batch-query support (allows async transactions)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'mysql2' | |
| require 'mysql2/em' | |
| class MysqlConnectionPool | |
| def initialize(conf) | |
| @pool_size = conf[:size] || 10 | |
| @query_queue = EM::Queue.new | |
| @connections = {} | |
| start_queue conf | |
| end | |
| def queue_worker_loop | |
| proc{ |connection| | |
| worker_state = @connections[connection] | |
| if (worker_state.size > 0) | |
| # process existing queries in worker_state | |
| stmt = worker_state.pop | |
| sql = stmt[:sql].is_a?(Proc) ? stmt[:sql].call(connection) : stmt[:sql] | |
| connection.query(sql, stmt[:opts]).callback do |result| | |
| stmt[:callback].call result if stmt[:callback] | |
| queue_worker_loop.call connection | |
| end | |
| else | |
| # add new queries in worker_state | |
| @query_queue.pop do |elem| | |
| if elem[:sql].is_a?(Array) | |
| elem[:sql].each do |stmt| | |
| worker_state << :sql => stmt, :opts => elem[:opts], :callback => elem[:callback] | |
| end | |
| else | |
| worker_state << elem | |
| end | |
| queue_worker_loop.call connection | |
| end | |
| end | |
| } | |
| end | |
| def start_queue(conf) | |
| @pool_size.times do | |
| connection = Mysql2::EM::Client.new(conf) | |
| @connections[connection] = [] | |
| queue_worker_loop.call connection | |
| end | |
| end | |
| def query(sql, opts={}, &block) | |
| @query_queue.push :sql => sql, :opts => opts, :callback => block | |
| end | |
| end |
Author
Author
l. 32, worker_state << should be replaced by worker_state.push to get this working IIRC.
Just for information, this piece of code allows mysql transactions bits to be sent asynchronously (transaction bits need to be on the same connection).
This implementation does not cover error handling, which you do need for proper retrying as the documentation points out (http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html - "Always be prepared to re-issue a transaction if it fails due to deadlock. Deadlocks are not dangerous. Just try again.").
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
untested code, pending confirmation by buddy