Last active
February 24, 2016 22:03
-
-
Save sax/1dcaacab2b0068f132a8 to your computer and use it in GitHub Desktop.
Multidb migration support
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
common: &common | |
adapter: postgresql | |
host: 127.0.0.1 | |
port: 5432 | |
username: ******* | |
password: ******* | |
encoding: unicode | |
pool: 20 | |
min_messages: WARNING | |
schema_search_path: public | |
multidb: | |
fallback: false | |
databases: | |
stuff: | |
schema_search_path: stuff | |
things: | |
schema_search_path: things | |
development: &dev | |
<<: *common | |
database: development | |
test: &test | |
<<: *common | |
database: test |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /lib/ext/multidb.rb | |
# | |
module Multidb | |
class Balancer | |
def use(name, &block) | |
raise 'Multidb.use must have a block given' unless block_given? | |
result = nil | |
get(name) do |candidate| | |
candidate.connection do |connection| | |
previous_candidate = Thread.current[:multidb_connection_candidate] | |
Thread.current[:multidb_connection_candidate] = candidate | |
previous_connection = Thread.current[:multidb_connection] | |
Thread.current[:multidb_connection] = connection | |
previous_name = Thread.current[:multidb_connection_name] | |
Thread.current[:multidb_connection_name] =name | |
begin | |
result = yield | |
ensure | |
Thread.current[:multidb_connection_candidate] = previous_candidate | |
Thread.current[:multidb_connection] = previous_connection | |
Thread.current[:multidb_connection_name] = previous_name | |
end | |
result | |
end | |
end | |
result | |
end | |
def current_connection_name | |
Thread.current[:multidb_connection_name] | |
end | |
end | |
end | |
module Ext | |
module Multidb | |
module ModelExtensions | |
extend ActiveSupport::Concern | |
included do | |
class << self | |
alias_method_chain :connection_pool, :multidb | |
end | |
end | |
module ClassMethods | |
def connection_pool_with_multidb | |
return Thread.current[:multidb_connection_candidate].connection_pool if Thread.current[:multidb_connection_candidate] | |
connection_pool_without_multidb | |
end | |
end | |
end | |
end | |
end | |
ActiveRecord::Base.class_eval do | |
include Ext::Multidb::ModelExtensions | |
end | |
module Ext | |
module Multidb | |
module LogSubscriber | |
def self.included(base) | |
base.alias_method_chain :debug, :multidb | |
end | |
def debug_with_multidb(msg) | |
name = Multidb.balancer.current_connection_name | |
conn = name.nil? ? '[Shard: N/A]' : "[Shard: #{name}]" | |
debug_without_multidb(conn + msg) | |
end | |
end | |
end | |
end | |
ActiveRecord::LogSubscriber.send(:include, Ext::Multidb::LogSubscriber) | |
module Ext | |
module Migration | |
module InstanceOrClassMethods | |
def announce_with_shards(message) | |
announce_without_shards("#{message} - #{current_shard}") | |
end | |
def current_shard | |
"Shard: #{Multidb.balancer.current_connection_name}" | |
end | |
end | |
include InstanceOrClassMethods | |
def self.included(base) | |
base.extend(ClassMethods) | |
base.alias_method_chain :announce, :shards | |
base.class_attribute :use_shards, instance_reader: false, instance_writer: false | |
end | |
module ClassMethods | |
def shards | |
Multidb.balancer.instance_variable_get(:@candidates).keys | |
end | |
end | |
end | |
end | |
module Ext | |
module Migration | |
module Migrator | |
def self.included(base) | |
base.extend(ClassMethods) | |
base.class_eval do | |
class << self | |
alias_method_chain :migrate, :multidb | |
alias_method_chain :up, :multidb | |
alias_method_chain :down, :multidb | |
alias_method_chain :run, :multidb | |
end | |
end | |
base.alias_method_chain :run, :multidb | |
base.alias_method_chain :migrate, :multidb | |
base.alias_method_chain :migrations, :multidb | |
end | |
def run_with_multidb(&block) | |
run_without_multidb(&block) | |
rescue ActiveRecord::UnknownMigrationVersionError => e | |
raise unless migrations(true).detect { |m| m.version == e.version } | |
end | |
def migrate_with_multidb(&block) | |
migrate_without_multidb(&block) | |
rescue ActiveRecord::UnknownMigrationVersionError => e | |
raise unless migrations(true).detect { |m| m.version == e.version } | |
end | |
def migrations_with_multidb(shard_agnostic = false) | |
migrations = migrations_without_multidb | |
return migrations if Multidb.balancer.current_connection_name.nil? || shard_agnostic | |
migrations.select { |m| m.shards.include?(Multidb.balancer.current_connection_name) } | |
end | |
module ClassMethods | |
def candidates | |
Multidb.balancer.instance_variable_get(:@candidates).keys | |
end | |
def migrate_with_multidb(migrations_paths, target_version = nil, &block) | |
candidates.each do |name| | |
Multidb.use(name) do | |
migrate_without_multidb(migrations_paths, target_version, &block) | |
end | |
end | |
end | |
def up_with_multidb(migrations_paths, target_version = nil, &block) | |
candidates.each do |name| | |
Multidb.use(name) do | |
up_without_multidb(migrations_paths, target_version, &block) | |
end | |
end | |
end | |
def down_with_multidb(migrations_paths, target_version = nil, &block) | |
candidates.each do |name| | |
Multidb.use(name) do | |
down_without_multidb(migrations_paths, target_version, &block) | |
end | |
end | |
end | |
def run_with_multidb(direction, migrations_paths, target_version) | |
candidates.each do |name| | |
Multidb.use(name) do | |
run_without_multidb(direction, migrations_paths, target_version) | |
end | |
end | |
end | |
end | |
end | |
module MigrationProxy | |
def shards | |
migration.class.shards | |
end | |
end | |
module UnknownMigrationVersionError | |
def self.included(base) | |
base.alias_method_chain :initialize, :multidb | |
base.send(:attr_accessor, :version) | |
end | |
def initialize_with_multidb(version) | |
@version = version | |
initialize_without_multidb(version) | |
end | |
end | |
end | |
end | |
ActiveRecord::Migration.send(:include, Ext::Migration) | |
ActiveRecord::Migrator.send(:include, Ext::Migration::Migrator) | |
ActiveRecord::MigrationProxy.send(:include, Ext::Migration::MigrationProxy) | |
ActiveRecord::UnknownMigrationVersionError.send(:include, Ext::Migration::UnknownMigrationVersionError) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment