Skip to content

Instantly share code, notes, and snippets.

@kares
Last active August 29, 2015 13:56
Show Gist options
  • Save kares/8939332 to your computer and use it in GitHub Desktop.
Save kares/8939332 to your computer and use it in GitHub Desktop.
# require 'active_record/connection_adapters/abstract_adapter'
# Adapter (connection) compatibility methods for AR 2.3 as required
# by the (AR 3.2) back-ported ConnectionPool implementation.
class ActiveRecord::ConnectionAdapters::AbstractAdapter
include MonitorMixin
# HACK: to initialize the monitor, in 3.2 #initialize calls the
# MonitorMixin#initialize by doing super() but in 2.3 it does not
# we do about the same here by overriding Adapter#new :
def self.new(*args)
instance = super *args
instance.send :mon_initialize
instance
end
attr_reader :last_use, :in_use
alias :in_use? :in_use
def lease
synchronize do
unless in_use
@in_use = true
@last_use = Time.now
end
end
end
def expire
@in_use = false
end
end
require 'active_record/version'
unless ActiveRecord::VERSION::STRING < '3.2.0'
# NOTE: no need to patch on 3.2 - delete this(these) file(s) than !
warn "loading patched #{__FILE__} one should remove this on >= 3.2"
end
require 'thread'
require 'monitor'
require 'set'
module ActiveRecord
# Raised when a connection could not be obtained within the connection
# acquisition timeout period.
class ConnectionTimeoutError < ConnectionNotEstablished
end
module ConnectionAdapters
# Connection pool base class for managing Active Record database
# connections.
#
# == Introduction
#
# A connection pool synchronizes thread access to a limited number of
# database connections. The basic idea is that each thread checks out a
# database connection from the pool, uses that connection, and checks the
# connection back in. ConnectionPool is completely thread-safe, and will
# ensure that a connection cannot be used by two threads at the same time,
# as long as ConnectionPool's contract is correctly followed. It will also
# handle cases in which there are more threads than connections: if all
# connections have been checked out, and a thread tries to checkout a
# connection anyway, then ConnectionPool will wait until some other thread
# has checked in a connection.
#
# == Obtaining (checking out) a connection
#
# Connections can be obtained and used from a connection pool in several
# ways:
#
# 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
# earlier (pre-connection-pooling). Eventually, when you're done with
# the connection(s) and wish it to be returned to the pool, you call
# ActiveRecord::Base.clear_active_connections!. This will be the
# default behavior for Active Record when used in conjunction with
# Action Pack's request handling cycle.
# 2. Manually check out a connection from the pool with
# ActiveRecord::Base.connection_pool.checkout. You are responsible for
# returning this connection to the pool when finished by calling
# ActiveRecord::Base.connection_pool.checkin(connection).
# 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
# obtains a connection, yields it as the sole argument to the block,
# and returns it to the pool after the block completes.
#
# Connections in the pool are actually AbstractAdapter objects (or objects
# compatible with AbstractAdapter's interface).
#
# == Options
#
# There are two connection-pooling-related options that you can add to
# your database connection configuration:
#
# * +pool+: number indicating size of connection pool (default 5)
# * +checkout _timeout+: number of seconds to block and wait for a
# connection before giving up and raising a timeout error
# (default 5 seconds). ('wait_timeout' supported for backwards
# compatibility, but conflicts with key used for different purpose
# by mysql2 adapter).
class ConnectionPool
include MonitorMixin
attr_accessor :automatic_reconnect
attr_reader :spec, :connections
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
# host name, username, password, etc), as well as the maximum size for
# this ConnectionPool.
#
# The default ConnectionPool maximum size is 5.
def initialize(spec)
super()
@spec = spec
# The cache of reserved connections mapped to threads
@reserved_connections = {}
@queue = new_cond
# 'wait_timeout', the backward-compatible key, conflicts with spec key
# used by mysql2 for something entirely different, checkout_timeout
# preferred to avoid conflict and allow independent values.
@timeout = spec.config[:checkout_timeout] || spec.config[:wait_timeout] || 5
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
@connections = []
@automatic_reconnect = true
end
# Retrieve the connection associated with the current thread, or call
# #checkout to obtain one if necessary.
#
# #connection can be called any number of times; the connection is
# held in a hash keyed by the thread id.
def connection
synchronize do
@reserved_connections[current_connection_id] ||= checkout
end
end
# Is there an open connection that is being used for the current thread?
def active_connection?
synchronize do
@reserved_connections.fetch(current_connection_id) {
return false
}.in_use?
end
end
# Signal that the thread is finished with the current connection.
# #release_connection releases the connection-thread association
# and returns the connection to the pool.
def release_connection(with_id = current_connection_id)
conn = synchronize { @reserved_connections.delete(with_id) }
checkin conn if conn
end
# If a connection already exists yield it to the block. If no connection
# exists checkout a connection, yield it to the block, and checkin the
# connection when finished.
def with_connection
connection_id = current_connection_id
fresh_connection = true unless active_connection?
yield connection
ensure
release_connection(connection_id) if fresh_connection
end
# Returns true if a connection has already been opened.
def connected?
synchronize { @connections.any? }
end
# Disconnects all connections in the pool, and clears the pool.
def disconnect!
synchronize do
@reserved_connections = {}
@connections.each do |conn|
checkin conn
conn.disconnect!
end
@connections = []
end
end
# Clears the cache which maps classes.
def clear_reloadable_connections!
synchronize do
@reserved_connections = {}
@connections.each do |conn|
checkin conn
conn.disconnect! if conn.requires_reloading?
end
@connections.delete_if do |conn|
conn.requires_reloading?
end
end
end
# Verify active connections and remove and disconnect connections
# associated with stale threads.
def verify_active_connections! #:nodoc:
synchronize do
clear_stale_cached_connections!
@connections.each do |connection|
connection.verify!
end
end
end
# Return any checked-out connections back to the pool by threads that
# are no longer alive.
def clear_stale_cached_connections!
keys = @reserved_connections.keys - Thread.list.find_all { |t|
t.alive?
}.map { |thread| thread.object_id }
keys.each do |key|
conn = @reserved_connections[key]
checkin conn
@reserved_connections.delete(key)
end
end
# Check-out a database connection from the pool, indicating that you want
# to use it. You should call #checkin when you no longer need this.
#
# This is done by either returning an existing connection, or by creating
# a new connection. If the maximum number of connections for this pool has
# already been reached, but the pool is empty (i.e. they're all being used),
# then this method will wait until a thread has checked in a connection.
# The wait time is bounded however: if no connection can be checked out
# within the timeout specified for this pool, then a ConnectionTimeoutError
# exception will be raised.
#
# Returns: an AbstractAdapter object.
#
# Raises:
# - ConnectionTimeoutError: no connection can be obtained from the pool
# within the timeout period.
def checkout
synchronize do
waited_time = 0
loop do
conn = @connections.find { |c| c.lease }
unless conn
if @connections.size < @size
conn = checkout_new_connection
conn.lease
end
end
if conn
checkout_and_verify conn
return conn
end
if waited_time >= @timeout
raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it."
end
# Sometimes our wait can end because a connection is available,
# but another thread can snatch it up first. If timeout hasn't
# passed but no connection is avail, looks like that happened --
# loop and wait again, for the time remaining on our timeout.
before_wait = Time.now
@queue.wait( [@timeout - waited_time, 0].max )
waited_time += (Time.now - before_wait)
# Will go away in Rails 4, when we don't clean up
# after leaked connections automatically anymore. Right now, clean
# up after we've returned from a 'wait' if it looks like it's
# needed, then loop and try again.
if(active_connections.size >= @connections.size)
clear_stale_cached_connections!
end
end
end
end
# Check-in a database connection back into the pool, indicating that you
# no longer need this connection.
#
# +conn+: an AbstractAdapter object, which was obtained by earlier by
# calling +checkout+ on this pool.
def checkin(conn)
synchronize do
conn.run_callbacks :checkin do
conn.expire
@queue.signal
end
release conn
end
end
private
def release(conn)
synchronize do
thread_id = nil
if @reserved_connections[current_connection_id] == conn
thread_id = current_connection_id
else
thread_id = @reserved_connections.keys.find { |k|
@reserved_connections[k] == conn
}
end
@reserved_connections.delete thread_id if thread_id
end
end
def new_connection
ActiveRecord::Base.send(spec.adapter_method, spec.config)
end
def current_connection_id
ActiveRecord::Base.connection_id ||= Thread.current.object_id
end
def checkout_new_connection
raise ConnectionNotEstablished unless @automatic_reconnect
c = new_connection
c.pool = self if c.respond_to?(:pool) # 2.3 does not track its pool
@connections << c
c
end
def checkout_and_verify(c)
c.run_callbacks :checkout do
c.verify!
end
c
end
def active_connections
@connections.find_all { |c| c.in_use? }
end
end
# ConnectionHandler is a collection of ConnectionPool objects. It is used
# for keeping separate connection pools for Active Record models that connect
# to different databases.
#
# For example, suppose that you have 5 models, with the following hierarchy:
#
# |
# +-- Book
# | |
# | +-- ScaryBook
# | +-- GoodBook
# +-- Author
# +-- BankAccount
#
# Suppose that Book is to connect to a separate database (i.e. one other
# than the default database). Then Book, ScaryBook and GoodBook will all use
# the same connection pool. Likewise, Author and BankAccount will use the
# same connection pool. However, the connection pool used by Author/BankAccount
# is not the same as the one used by Book/ScaryBook/GoodBook.
#
# Normally there is only a single ConnectionHandler instance, accessible via
# ActiveRecord::Base.connection_handler. Active Record models use this to
# determine that connection pool that they should use.
class ConnectionHandler
attr_reader :connection_pools
def initialize(pools = {})
@connection_pools = pools
@class_to_pool = {}
end
def establish_connection(name, spec)
@connection_pools[spec] ||= ConnectionAdapters::ConnectionPool.new(spec)
@class_to_pool[name] = @connection_pools[spec]
end
# Returns true if there are any active connections among the connection
# pools that the ConnectionHandler is managing.
def active_connections?
connection_pools.values.any? { |pool| pool.active_connection? }
end
# Returns any connections in use by the current thread back to the pool.
def clear_active_connections!
@connection_pools.each_value {|pool| pool.release_connection }
end
# Clears the cache which maps classes.
def clear_reloadable_connections!
@connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
end
def clear_all_connections!
@connection_pools.each_value {|pool| pool.disconnect! }
end
# Verify active connections.
def verify_active_connections! #:nodoc:
@connection_pools.each_value {|pool| pool.verify_active_connections! }
end
# Locate the connection of the nearest super class. This can be an
# active or defined connection: if it is the latter, it will be
# opened and set as the active connection for the class it was defined
# for (not necessarily the current class).
def retrieve_connection(klass) #:nodoc:
pool = retrieve_connection_pool(klass)
(pool && pool.connection) or raise ConnectionNotEstablished
end
# Returns true if a connection that's accessible to this class has
# already been opened.
def connected?(klass)
conn = retrieve_connection_pool(klass)
conn && conn.connected?
end
# Remove the connection for this class. This will close the active
# connection and the defined connection (if they exist). The result
# can be used as an argument for establish_connection, for easily
# re-establishing the connection.
def remove_connection(klass)
pool = @class_to_pool.delete(klass.name)
return nil unless pool
@connection_pools.delete pool.spec
pool.automatic_reconnect = false
pool.disconnect!
pool.spec.config
end
def retrieve_connection_pool(klass)
pool = @class_to_pool[klass.name]
return pool if pool
return nil if ActiveRecord::Base == klass
retrieve_connection_pool klass.superclass
end
end
class ConnectionManagement
class Proxy
attr_reader :body, :testing
def initialize(body, testing = false)
@body = body
@testing = testing
end
def method_missing(method_sym, *arguments, &block)
@body.send(method_sym, *arguments, &block)
end
def respond_to?(method_sym, include_private = false)
super || @body.respond_to?(method_sym)
end
def each(&block)
body.each(&block)
end
def close
body.close if body.respond_to?(:close)
# Don't return connection (and perform implicit rollback) if
# this request is a part of integration test
ActiveRecord::Base.clear_active_connections! unless testing
end
end
def initialize(app)
@app = app
end
def call(env)
testing = env.key?('rack.test')
status, headers, body = @app.call(env)
[status, headers, Proxy.new(body, testing)]
rescue
ActiveRecord::Base.clear_active_connections! unless testing
raise
end
end
end
end
require 'active_record/connection_adapters/abstract/adapter_compat'
require 'uri'
module ActiveRecord
class Base
class ConnectionSpecification #:nodoc:
attr_reader :config, :adapter_method
def initialize (config, adapter_method)
@config, @adapter_method = config, adapter_method
end
##
# Builds a ConnectionSpecification from user input
class Resolver # :nodoc:
attr_reader :config, :klass, :configurations
def initialize(config, configurations)
@config = config
@configurations = configurations
end
def spec
case config
when nil
raise AdapterNotSpecified unless defined?(Rails.env)
resolve_string_connection Rails.env
when Symbol, String
resolve_string_connection config.to_s
when Hash
resolve_hash_connection config
end
end
private
def resolve_string_connection(spec) # :nodoc:
hash = configurations.fetch(spec) do |k|
connection_url_to_hash(k)
end
raise(AdapterNotSpecified, "#{spec} database is not configured") unless hash
resolve_hash_connection hash
end
def resolve_hash_connection(spec) # :nodoc:
spec = spec.symbolize_keys
raise(AdapterNotSpecified, "database configuration does not specify adapter") unless spec.key?(:adapter)
begin
require "active_record/connection_adapters/#{spec[:adapter]}_adapter"
rescue LoadError => e
raise LoadError, "Please install the #{spec[:adapter]} adapter: `gem install activerecord-#{spec[:adapter]}-adapter` (#{e.message})", e.backtrace
end
adapter_method = "#{spec[:adapter]}_connection"
ConnectionSpecification.new(spec, adapter_method)
end
def connection_url_to_hash(url) # :nodoc:
config = URI.parse url
adapter = config.scheme
adapter = "postgresql" if adapter == "postgres"
spec = { :adapter => adapter,
:username => config.user,
:password => config.password,
:port => config.port,
:database => config.path.sub(%r{^/},""),
:host => config.host }
spec.reject!{ |_,value| value.blank? }
spec.map { |key,value| spec[key] = URI.unescape(value) if value.is_a?(String) }
if config.query
options = Hash[config.query.split("&").map{ |pair| pair.split("=") }].symbolize_keys
spec.merge!(options)
end
spec
end
end
end
##
# :singleton-method:
# The connection handler
class_attribute :connection_handler, :instance_writer => false
self.connection_handler = ConnectionAdapters::ConnectionHandler.new
# Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work that isn't
# easily done without going straight to SQL.
def connection
self.class.connection
end
# Establishes the connection to the database. Accepts a hash as input where
# the <tt>:adapter</tt> key must be specified with the name of a database adapter (in lower-case)
# example for regular databases (MySQL, Postgresql, etc):
#
# ActiveRecord::Base.establish_connection(
# :adapter => "mysql",
# :host => "localhost",
# :username => "myuser",
# :password => "mypass",
# :database => "somedatabase"
# )
#
# Example for SQLite database:
#
# ActiveRecord::Base.establish_connection(
# :adapter => "sqlite",
# :database => "path/to/dbfile"
# )
#
# Also accepts keys as strings (for parsing from YAML for example):
#
# ActiveRecord::Base.establish_connection(
# "adapter" => "sqlite",
# "database" => "path/to/dbfile"
# )
#
# Or a URL:
#
# ActiveRecord::Base.establish_connection(
# "postgres://myuser:mypass@localhost/somedatabase"
# )
#
# The exceptions AdapterNotSpecified, AdapterNotFound and ArgumentError
# may be returned on an error.
def self.establish_connection(spec = ENV["DATABASE_URL"])
resolver = ConnectionSpecification::Resolver.new spec, configurations
spec = resolver.spec
unless respond_to?(spec.adapter_method)
raise AdapterNotFound, "database configuration specifies nonexistent #{spec.config[:adapter]} adapter"
end
remove_connection
connection_handler.establish_connection name, spec
end
class << self
# Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work unrelated
# to any of the specific Active Records.
def connection
retrieve_connection
end
def connection_id
Thread.current['ActiveRecord::Base.connection_id']
end
def connection_id=(connection_id)
Thread.current['ActiveRecord::Base.connection_id'] = connection_id
end
# Returns the configuration of the associated connection as a hash:
#
# ActiveRecord::Base.connection_config
# # => {:pool=>5, :timeout=>5000, :database=>"db/development.sqlite3", :adapter=>"sqlite3"}
#
# Please use only for reading.
def connection_config
connection_pool.spec.config
end
def connection_pool
connection_handler.retrieve_connection_pool(self) or raise ConnectionNotEstablished
end
def retrieve_connection
connection_handler.retrieve_connection(self)
end
# Returns true if Active Record is connected.
def connected?
connection_handler.connected?(self)
end
def remove_connection(klass = self)
connection_handler.remove_connection(klass)
end
def clear_active_connections!
connection_handler.clear_active_connections!
end
delegate :clear_reloadable_connections!,
:clear_all_connections!,:verify_active_connections!, :to => :connection_handler
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment