Last active
August 29, 2015 14:10
-
-
Save fxg42/c1cabc8cd8d88cef95ac to your computer and use it in GitHub Desktop.
Migration simpliste.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
drop table if exists schema_migrations; | |
create table schema_migrations ( | |
id serial primary key not null, | |
version character varying(255) unique not null | |
); | |
alter table schema_migrations | |
owner to postgres; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
_ = require 'highland' | |
fs = require 'fs' | |
pg = require 'pg' | |
QueryStream = require 'pg-query-stream' | |
CONNECTION_STRING = "postgres://postgres:postgres@localhost/contacts" | |
# Monadic form | |
class Unit | |
constructor: (@filename, @sql="", @result, @time) -> | |
[..., @version, @description] = /^(\d{12})_(.*)$/.exec(@filename) | |
# transformation helpers | |
unit = (filename) -> new Unit(filename) | |
consumer = (fn) -> (err, unit, push, next) -> | |
if err | |
push(err) | |
next() | |
else if unit is _.nil | |
push(null, unit) | |
else | |
fn(unit, push, next) | |
# filters | |
select_migrationFiles = (files, push, next) -> | |
push(null, file) for file in files.sort() when /^\d{12}_.*\.sql$/.test(file) | |
next() | |
reject_previousMigrations = ([units, previous], push, next) -> | |
push(null, unit) for unit in units when unit.version not in previous | |
next() | |
# Monadic transformations | |
map_readFile = (unit, push, next) -> | |
fs.readFile unit.filename, {encoding:'utf8'}, (err, data) -> | |
if err | |
push(err) | |
else | |
push(null, new Unit(unit.filename, data)) | |
next() | |
flatMap_splitStatements = (unit, push, next) -> | |
stmts = (stmt for stmt in unit.sql.split(/;\s*/) when /^(alter|create|drop|insert)/.test(stmt)) | |
stmts.push("insert into schema_migrations (version) values (#{unit.version})") | |
push(null, new Unit(unit.filename, stmt)) for stmt in stmts | |
next() | |
map_executeStatements = (unit, push, next) -> | |
pg.connect CONNECTION_STRING, (err, client, done) -> | |
if err | |
push(err) | |
else | |
time = Date.now() | |
client.query unit.sql, (err, result) -> | |
if err | |
push(err) | |
else | |
push(null, new Unit(unit.filename, unit.sql, result, Date.now()-time)) | |
done() | |
next() | |
# generators | |
rowGenerator = (sql) -> (push, next) -> | |
pg.connect CONNECTION_STRING, (err, client, done) -> | |
if err | |
push(err) | |
else | |
stream = client.query(new QueryStream(sql, [])) | |
stream.on 'end', done | |
next _(stream) | |
# taps | |
printReport = (unitGroups) -> | |
for own filename, units of unitGroups | |
do (totalTime = 0) -> | |
console.log "== #{units[0].description}" | |
console.log "=================================================" | |
for unit in units | |
console.log "-- #{unit.sql.replace(/\s+/g, ' ')}" | |
console.log " -> #{unit.time} ms" | |
if unit.result.rowCount | |
console.log " -> #{unit.result.rowCount} row(s)" | |
totalTime += unit.time | |
console.log "== #{units[0].description} migrated (#{totalTime}) ms\n" | |
# streams | |
previousMigrations = | |
_ rowGenerator 'select version from schema_migrations order by version' | |
.pluck 'version' | |
.collect() | |
_.wrapCallback(fs.readdir)('.') # list contents of current directory | |
.consume consumer select_migrationFiles # select only migration files | |
.map unit # give them a monadic form | |
.collect() # collect all units into an array | |
.concat previousMigrations # push the array of migrated versions onto the stream | |
.collect() # collect both arrays into a single array | |
.consume consumer reject_previousMigrations # filter out units that have already been migrated | |
.consume consumer map_readFile # read the file contents | |
.consume consumer flatMap_splitStatements # for each unit emit one unit per sql statement | |
.consume consumer map_executeStatements # execute all sql statements. | |
.group _.get 'filename' # group sql statements by their filename | |
.doto printReport # print a nice report | |
.apply -> pg.end() # close the connection pool | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment