Skip to content

Instantly share code, notes, and snippets.

@fxg42
Last active August 29, 2015 14:10
Show Gist options
  • Save fxg42/c1cabc8cd8d88cef95ac to your computer and use it in GitHub Desktop.
Save fxg42/c1cabc8cd8d88cef95ac to your computer and use it in GitHub Desktop.
Migration simpliste.
drop table if exists schema_migrations;
create table schema_migrations (
id serial primary key not null,
version character varying(255) unique not null
);
alter table schema_migrations
owner to postgres;
_ = require 'highland'
fs = require 'fs'
pg = require 'pg'
QueryStream = require 'pg-query-stream'
CONNECTION_STRING = "postgres://postgres:postgres@localhost/contacts"
# Monadic form
class Unit
constructor: (@filename, @sql="", @result, @time) ->
[..., @version, @description] = /^(\d{12})_(.*)$/.exec(@filename)
# transformation helpers
unit = (filename) -> new Unit(filename)
consumer = (fn) -> (err, unit, push, next) ->
if err
push(err)
next()
else if unit is _.nil
push(null, unit)
else
fn(unit, push, next)
# filters
select_migrationFiles = (files, push, next) ->
push(null, file) for file in files.sort() when /^\d{12}_.*\.sql$/.test(file)
next()
reject_previousMigrations = ([units, previous], push, next) ->
push(null, unit) for unit in units when unit.version not in previous
next()
# Monadic transformations
map_readFile = (unit, push, next) ->
fs.readFile unit.filename, {encoding:'utf8'}, (err, data) ->
if err
push(err)
else
push(null, new Unit(unit.filename, data))
next()
flatMap_splitStatements = (unit, push, next) ->
stmts = (stmt for stmt in unit.sql.split(/;\s*/) when /^(alter|create|drop|insert)/.test(stmt))
stmts.push("insert into schema_migrations (version) values (#{unit.version})")
push(null, new Unit(unit.filename, stmt)) for stmt in stmts
next()
map_executeStatements = (unit, push, next) ->
pg.connect CONNECTION_STRING, (err, client, done) ->
if err
push(err)
else
time = Date.now()
client.query unit.sql, (err, result) ->
if err
push(err)
else
push(null, new Unit(unit.filename, unit.sql, result, Date.now()-time))
done()
next()
# generators
rowGenerator = (sql) -> (push, next) ->
pg.connect CONNECTION_STRING, (err, client, done) ->
if err
push(err)
else
stream = client.query(new QueryStream(sql, []))
stream.on 'end', done
next _(stream)
# taps
printReport = (unitGroups) ->
for own filename, units of unitGroups
do (totalTime = 0) ->
console.log "== #{units[0].description}"
console.log "================================================="
for unit in units
console.log "-- #{unit.sql.replace(/\s+/g, ' ')}"
console.log " -> #{unit.time} ms"
if unit.result.rowCount
console.log " -> #{unit.result.rowCount} row(s)"
totalTime += unit.time
console.log "== #{units[0].description} migrated (#{totalTime}) ms\n"
# streams
previousMigrations =
_ rowGenerator 'select version from schema_migrations order by version'
.pluck 'version'
.collect()
_.wrapCallback(fs.readdir)('.') # list contents of current directory
.consume consumer select_migrationFiles # select only migration files
.map unit # give them a monadic form
.collect() # collect all units into an array
.concat previousMigrations # push the array of migrated versions onto the stream
.collect() # collect both arrays into a single array
.consume consumer reject_previousMigrations # filter out units that have already been migrated
.consume consumer map_readFile # read the file contents
.consume consumer flatMap_splitStatements # for each unit emit one unit per sql statement
.consume consumer map_executeStatements # execute all sql statements.
.group _.get 'filename' # group sql statements by their filename
.doto printReport # print a nice report
.apply -> pg.end() # close the connection pool
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment