Created
February 3, 2017 22:51
-
-
Save mclark-newvistas/a9e03fe6c89e3b6c54f6ad29e87bb9bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var EventEmitter = require('events'); | |
var crypto = require('crypto'); | |
var r = require('rethinkdb'); | |
// source: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions | |
function escapeRegExp(string) { | |
return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string | |
} | |
// rewrite of deepstream.io-storage-rethinkdb | |
module.exports = function(conn, conf) { | |
var separator = conf.splitChar || '/'; | |
var defaultTable = conf.defaultTable; | |
var tableMatch = new RegExp('^(\\w+)' + escapeRegExp(separator)); | |
var primary = 'ds_id'; | |
var emitter = new EventEmitter(); | |
var creating = new Map(); // table => EventEmitter | |
var tableCache; | |
function splitKey(key) { | |
var table = key.match(tableMatch); | |
var values = [defaultTable, key]; | |
if (table) | |
values = [table[1], key.substr(table[1].length + 1)]; | |
// rethink can't have a key > 127 bytes; hash key and store alongside | |
if (values[1].length > 127) { | |
values[2] = values[1]; | |
values[1] = crypto.createHash('sha256').update(values[1]).digest('hex'); | |
} | |
return values; | |
} | |
// value is like { _v: 1, _d: { stuff I care about } }; _d might be an array | |
function convertFromDeepstream(value) { | |
value = JSON.parse(JSON.stringify(value)); // clone | |
var record = value._d; | |
delete value._d; | |
if (Array.isArray(record)) { | |
record = { __dsList: record }; | |
} | |
record.__ds = value; | |
return record; | |
} | |
function convertToDeepstream(value) { | |
value = JSON.parse(JSON.stringify(value)); // clone | |
var record = value.__ds; | |
delete value.__ds; | |
record._d = value.__dsList ? value.__dsList : value; | |
return record; | |
} | |
function getTables() { | |
return r.tableList().run(conn).then(tables => { | |
tableCache = new Set(tables); | |
}); | |
} | |
function createTable(table, callback) { | |
if (creating.has(table)) | |
return creating.get(table).once('done', callback); | |
var created = new EventEmitter(); | |
creating.set(table, created); | |
created.once('done', () => creating.delete(table)); // cleanup | |
created.once('done', callback); | |
return r.tableCreate(table, { | |
primaryKey: primary, | |
durability: 'soft', // matching upstream, we use the default, 'hard' | |
}).run(conn, err => { | |
if (err && err.msg.indexOf("already exists") === -1) | |
return created.emit('done', err); | |
if (err) // exists already, refresh table cache | |
return getTables().then(() => created.emit('done')).catch(err => created.emit('done', err)); | |
tableCache.add(table); | |
created.emit('done'); | |
}); | |
} | |
function set(key, value, callback) { | |
var [table, id, fullKey] = splitKey(key); | |
// TODO: honestly, I'd rather not transparently create tables, for | |
// all that we can probably lock it down with permissions somehow | |
if (!tableCache.has(table)) // if no table, make it - match upstream | |
return createTable(table, err => { | |
if (err) return callback(err); | |
return set(key, value, callback); | |
}); | |
value = convertFromDeepstream(value); | |
value[primary] = id; | |
if (fullKey) | |
value.__key = fullKey; | |
r.table(table).insert(value, { | |
returnChanges: false, | |
conflict: 'replace', | |
}).run(conn, callback); | |
} | |
function get(key, callback) { | |
var [table, id] = splitKey(key); | |
if (!tableCache.has(table)) // why no error? matching upstream | |
return callback(null, null); | |
r.table(table).get(id).run(conn).then(value => { | |
if (!value) | |
return callback(null, value); | |
delete value.__key; // in case is set | |
delete value[primary]; | |
return callback(null, convertToDeepstream(value)); | |
}).catch(e => callback(e)); | |
} | |
function remove(key, callback) { | |
var [table, id] = splitKey(key); | |
if (!tableCache.has(table)) // why error? matching upstream | |
return callback(new Error("Table " + table + " does not exist")); | |
r.table(table).get(id).delete().run(conn, callback); | |
} | |
emitter.set = set; | |
emitter.get = get; | |
emitter.delete = remove; | |
emitter.isReady = false; | |
// get tables; not ready till then | |
getTables().then(() => { | |
emitter.isReady = true; | |
emitter.emit('ready'); | |
}).catch(e => emitter.emit('error', e)); | |
return emitter; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment