Last active
August 29, 2015 14:07
-
-
Save esamattis/2907e43e162970d08568 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| "use strict"; | |
| var Promise = require("bluebird"); | |
| var debug = require("debug")("app:utils/GridSQL"); | |
| var through2 = require("through2"); | |
| /** | |
| * MongoDB's GridFS inspired data storage for Knex. The given file is saved to | |
| * multiple binary columns in order to enable proper file streaming. | |
| * | |
| * GridSQL works like a key value store. All files must be written with an | |
| * unique file id. | |
| * | |
| * Assumes a table like | |
| * | |
| * knex.schema.createTable("chunks", function(table) { | |
| * table.string("fileId").notNullable(); | |
| * table.binary("chunk").notNullable(); | |
| * table.integer("sequence").defaultTo(0).notNullable(); | |
| * }); | |
| * | |
| * | |
| * @namespace utils | |
| * @class GridSQL | |
| * @constructor | |
| * @param {Object} options | |
| * @param {Object} options.knex Knex instance | |
| * @param {Object} [options.chunkSize=(1024*255)] Chunk size | |
| * @param {Object} [options.tableName=chunks] Table name | |
| */ | |
| function GridSQL(options) { | |
| if (!(this instanceof GridSQL)) return new GridSQL(options); | |
| this.knex = options && options.knex; | |
| this.tableName = (options && options.tableName) || "chunks"; | |
| this.chunkSize = (options && options.chunkSize) || (1024 * 255); | |
| if (!this.knex) throw new Error("Missing options.knex"); | |
| } | |
| /** | |
| * Read file as a node.js readable stream. | |
| * | |
| * This method has a little bug https://github.com/tgriesser/knex/issues/484 | |
| * | |
| * @method read | |
| * @param {String} fileId | |
| * @return {stream.Readable} | |
| */ | |
| GridSQL.prototype.read = function(fileId) { | |
| debug("reading %s", fileId); | |
| var rowStream = this.knex.select("chunk") | |
| .from(this.tableName) | |
| .where({ fileId: fileId }) | |
| .orderBy("sequence", "asc") | |
| .stream({highWaterMark: 5}); | |
| return rowStream.pipe(through2.obj(function (row, enc, cb) { | |
| if (!row || !Buffer.isBuffer(row.chunk)) { | |
| var err = new Error("Invalid GridSQL row"); | |
| err.row = row; | |
| return cb(err); | |
| } | |
| debug("Reading chunk %s byte chunk", row.chunk.length); | |
| this.push(row.chunk); | |
| cb(); | |
| })); | |
| }; | |
| /** | |
| * Write node.js readable stream database with a unique file id | |
| * | |
| * @method write | |
| * @param {String} fileId | |
| * @param {stream.Writable} readable | |
| * @param {Object} [options] | |
| * @param {Object} [options.chunkSize] Custom chunks size for this file | |
| * @return {Bluebird.Promise} The returned promise is resolved when the | |
| * readable stream is fully saved to the database | |
| */ | |
| GridSQL.prototype.write = function(fileId, readable, options) { | |
| var knex = this.knex; | |
| var tableName = this.tableName; | |
| debug("Writing %s", fileId); | |
| var tableChunkSize = (options && options.chunkSize) || this.chunkSize; | |
| if (!knex) throw new Error("Missing options.knex"); | |
| var sequence = 0; | |
| var bytesWritten = 0; | |
| return knex.transaction(function(t) { | |
| var current = Promise.resolve(); | |
| function read() { | |
| if (current.isPending()) return; | |
| var chunk = readable.read(tableChunkSize); | |
| if (chunk === null) return; | |
| sequence += 1; | |
| bytesWritten += chunk.length; | |
| debug( | |
| "Got %s chunk of %s (%s) of bytes for %s", | |
| sequence, chunk.length, bytesWritten, fileId | |
| ); | |
| current = t.insert({ | |
| fileId: fileId, | |
| chunk: chunk, | |
| sequence: sequence | |
| }) | |
| .into(tableName) | |
| .then(function() { | |
| debug("Chunk %s written for %s", sequence, fileId); | |
| process.nextTick(read); | |
| }); | |
| } | |
| readable.on("readable", read); | |
| read(); | |
| return new Promise(function(resolve, reject){ | |
| readable.on("error", reject); | |
| readable.on("end", resolve); | |
| }) | |
| .then(function() { | |
| return current; | |
| }); | |
| }) | |
| .then(function() { | |
| return { | |
| bytesWritten: bytesWritten, | |
| chunkCount: sequence | |
| }; | |
| }); | |
| }; | |
| module.exports = GridSQL; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment