Created
January 19, 2015 18:05
-
-
Save stevenkaras/9f6c5bd27faea525ba3f to your computer and use it in GitHub Desktop.
Rails Migration/Copy/Merge Tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# migrate.rb Migration/Copying/Merging Tool | |
# By Steven Karas | |
# MIT License | |
require 'csv' | |
require 'logger' | |
class Migration | |
def initialize(path, logger = Logger.new(nil)) | |
@path = path | |
@mappings = {} | |
@offsets = {} | |
@logger = logger | |
end | |
attr_accessor :path, :mappings, :offsets | |
def load_offset(klass) | |
offsets[klass] = 0 | |
return unless File.exist?(offset_file(klass)) | |
offsets[klass] = File.read(offset_file(klass)).to_i | |
end | |
def offset_file(klass) | |
File.expand_path("offset_#{klass.name.downcase}", path) | |
end | |
def dump_offset(klass) | |
return if [0,nil].include? offsets[klass] | |
File.write(offset_file(klass), offsets[klass].to_s) | |
end | |
def offset(klass) | |
return offsets[klass] if offsets[klass] | |
load_offset(klass) | |
return offsets[klass] | |
end | |
def load_mapping(klass) | |
mappings[klass] = {} | |
return unless File.exist?(mapping_file(klass)) | |
CSV.foreach(mapping_file(klass), headers: true) do |row| | |
mappings[klass][row["old"]] = row["new"] | |
end | |
end | |
def mapping_file(klass) | |
File.expand_path("mapping_#{klass.name.downcase}.csv", path) | |
end | |
def dump_mapping(klass) | |
return if [nil, {}].include? mappings[klass] | |
CSV.open(mapping_file(klass), "w", headers: ["old", "new"]) do |csv| | |
csv << ["old", "new"] | |
mappings[klass].each do |old_id, new_id| | |
csv << [old_id, new_id] | |
end | |
end | |
end | |
def mapping(klass) | |
return mappings[klass] if mappings[klass] | |
load_mapping(klass) | |
return mappings[klass] | |
end | |
def dump(klasses) | |
klasses.each do |klass| | |
outfile = File.expand_path("#{klass.name.downcase}.csv", path) | |
next if rows_to_migrate(klass) == klass.count | |
CSV.open(outfile, "w", headers: klass.column_names) do |csv| | |
csv << klass.column_names | |
klass.all.each do |model| | |
csv << model.attributes | |
end | |
end | |
end | |
end | |
def migration_status(klasses) | |
klasses.each do |klass| | |
puts "Migrated #{offset(klass)} out of #{rows_to_migrate(klass)} #{klass.name}" | |
end | |
end | |
def rows_to_migrate(klass) | |
CSV.read(File.expand_path("#{klass.name.downcase}.csv", path)).size - 1 | |
rescue | |
0 | |
end | |
# ppsql is a ruby script that parses a Heroku database url and opens psql connected to that database | |
def migration_command(db_url, script, outfile) | |
"ppsql '#{db_url}' -A -F ',' -o #{outfile} -1 -f #{script}" | |
end | |
def create_old_id_field(table) | |
"ALTER TABLE #{table} ADD COLUMN old_id integer" | |
end | |
def remove_old_id_field(table) | |
"ALTER TABLE #{table} DROP COLUMN old_id" | |
end | |
def insert_statement(io, klass, rows) | |
io << <<-SQL.gsub(/ +/, " ") | |
INSERT INTO #{klass.table_name} | |
( #{rows.first.headers.map{|c|c == "id" ? "old_id" : c}.join(", ")} ) | |
VALUES | |
SQL | |
io << rows.map do |row| | |
filtered_row = filter_row(row, klass) | |
filtered_row = row.headers.map {|field| filtered_row[field]}.map{|f|klass.connection.quote(f)} | |
"(#{filtered_row.join(", ")})" | |
end.join(", \n") | |
if rows.first.headers.include? "id" | |
io << <<-SQL.gsub(/ +/, " ") | |
RETURNING old_id, id | |
SQL | |
end | |
io << ";" | |
end | |
def migration_script_for(klass) | |
db_url = "postgres://localhost:5432/postgres" | |
File.write(File.expand_path("step_1_#{klass.name.downcase}.sql", path), create_old_id_field(klass.table_name)) | |
step_2_call = "" | |
rows = CSV.read(File.expand_path("#{klass.name.downcase}.csv", path), headers: true) | |
# I ran into memory issues when trying to execute INSERTS with more than 100K rows | |
rows.each_slice(100000).each_with_index do |slice, index| | |
script_file = File.expand_path("step_2_#{klass.name.downcase}#{index == 0 ? "" : "_#{index}"}.sql", path) | |
step_2_call << "#{migration_command(db_url, script_file, mapping_file(klass))}\n" | |
File.open(script_file, "w") do |file| | |
insert_statement(file, klass, slice) | |
end | |
end | |
File.write(File.expand_path("step_3_#{klass.name.downcase}.sql", path), remove_old_id_field(klass.table_name)) | |
File.write File.expand_path("migrate_#{klass.name.downcase}.bash", path), <<-BASH.gsub(/^\s+/, "") | |
ppsql '#{db_url}' -f #{File.expand_path("step_1_#{klass.name.downcase}.sql", path)} | |
#{step_2_call} | |
ppsql '#{db_url}' -f #{File.expand_path("step_3_#{klass.name.downcase}.sql", path)} | |
# this trims the result of step 2 into a proper CSV file | |
ruby -e 'require "csv";d=CSV.read("#{mapping_file(klass)}");d[0]=%w{old new};d[-2..-1]=[];CSV.open("#{mapping_file(klass)}", "w"){|csv|d.each{|r|csv<<r}}' | |
BASH | |
end | |
def filter_row(row, klass) | |
filtered_row = row.to_h | |
# if there are any foreign keys, replace them now | |
klass.reflections.select {|n,r| r.macro == :belongs_to }.each do |name, reflection| | |
foreign_key = reflection.foreign_key | |
old_id = filtered_row[foreign_key] | |
filtered_row[foreign_key] = mapping(reflection.klass)[old_id] | |
raise "missing mapping for key #{old_id} in #{reflection.klass.name}" if filtered_row[foreign_key].nil? | |
end | |
return filtered_row | |
end | |
def migrate(klasses) | |
klasses.each do |klass| | |
next if rows_to_migrate(klass) == offset(klass) | |
result = migrate_class(klass) | |
return result unless result == 0 | |
end | |
return 0 | |
ensure | |
klasses.each{|klass|dump_mapping(klass)} | |
klasses.each{|klass|dump_offset(klass)} | |
end | |
def migrate_class(klass) | |
source_file = File.expand_path("#{klass.name.downcase}.csv", path) | |
return 0 unless File.exist?(source_file) | |
migrated = 0 | |
CSV.open(source_file, headers: true).each.each_slice(10) do |rows| | |
if offset(klass) > migrated + rows.size | |
migrated += rows.size | |
next | |
end | |
begin | |
ActiveRecord::Base.transaction do | |
rows.each do |row| | |
model = klass.create(filter_row(row, klass).reject{ |k,v| k == klass.primary_key }) | |
mapping(klass)[row[klass.primary_key]] = model.attributes[klass.primary_key] | |
end | |
end | |
rescue StandardError => e | |
@logger.warn "#{e.class}: #{e.message}" | |
@logger.warn e.backtrace | |
return 1 | |
end | |
migrated += rows.size | |
# We bail every few migration slices. This is done to work around connectivity issues in the ActiveRecord PG connector | |
if migrated - offset(klass) > 40 | |
return 2 | |
end | |
end | |
return 0 | |
ensure | |
@logger.info { "Committed #{migrated - offset(klass)} records. Current progress: #{migrated}" } | |
offsets[klass] = migrated | |
end | |
end | |
require 'thunder' | |
class MigrationCLI | |
def initialize | |
@logger = Logger.new(STDOUT) | |
@logger.level = Logger::WARN | |
end | |
include Thunder | |
default_command :status | |
def load_rails_env | |
@logger.info "Loading rails environment" | |
require './config/environment' | |
@logger.info "Rails environment loaded" | |
@classes = ActiveRecord::Base.direct_decendants.reject{|c|c.name.starts_with? "HABTM_"} | |
end | |
desc :raw_migrate, "raw_migrate KLASS DIR" | |
def raw_migrate(klass, dir = "tmp/dump") | |
load_rails_env | |
klass = Class.const_get(klass) | |
Migration.new(dir).migration_script_for(klass) | |
end | |
desc :status, "status DIR" | |
def status(dir = "tmp/dump") | |
load_rails_env | |
Migration.new(dir).migration_status(@classes) | |
end | |
desc :dump, "dump DIR" | |
def dump(dir = "tmp/dump") | |
load_rails_env | |
Migration.new(dir).dump(@classes) | |
end | |
option :verbose, type: Boolean, desc: "print extra information" | |
desc :migrate, "migrate DIR" | |
def migrate(dir = "tmp/dump", options = {}) | |
@logger.level = Logger::INFO if options[:verbose] | |
load_rails_env | |
exit Migration.new(dir, @logger).migrate(@classes) | |
end | |
end | |
if __FILE__ == $PROGRAM_NAME | |
MigrationCLI.new.start | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment