Last active
May 14, 2019 20:35
-
-
Save smddzcy/acd58d9fc4ae0bb2102b2c5e36df6ea3 to your computer and use it in GitHub Desktop.
Mongoose-like ORM for Cosmos DB SQL API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const _ = require('lodash'); | |
const Redis = require('ioredis'); | |
const sqlBuilder = require('mongo-sql'); // from https://github.com/smddzcy/mongo-sql#master | |
const uuid = require('uuid'); | |
const throat = require('throat'); | |
const DbError = require('../error/dbError'); | |
const cast = require('../cast'); | |
const client = new CosmosClient({ endpoint: '...', key: '...' }); | |
const database = client.database('...'); | |
const cacheTtl = 3600; | |
const autoAddedFields = { | |
id: String, | |
_rid: String, | |
_self: String, | |
_etag: String, | |
_attachments: String, | |
_ts: Number, | |
}; | |
class Schema { | |
constructor(schema, options) { | |
this.timestamps = true; | |
if (options) { | |
['timestamps', 'collection', 'model', 'partitionKey'].forEach(field => { | |
if (field in options) { | |
this[field] = options[field]; | |
} | |
}); | |
} | |
this.virtuals = []; | |
this.hooks = []; | |
this.schema = schema || {}; | |
this.schema = { ...autoAddedFields, ...this.schema }; | |
if (this.timestamps) { | |
this.schema.createdAt = Date; | |
this.schema.updatedAt = Date; | |
} | |
if (!this.collection) { | |
if (this.model) { | |
this.collection = `${_.camelCase(this.model)}s`; | |
} else { | |
throw new Error('Schema should either define a `model` or `collection`'); | |
} | |
} | |
if (!this.partitionKey) { | |
throw new Error('Schema should define a `partitionKey`'); | |
} | |
this.createCollection(); | |
this.ready = this.ready.bind(this); | |
this.deleteCollection = this.deleteCollection.bind(this); | |
this.createCollection = this.createCollection.bind(this); | |
this.recreateCollection = this.recreateCollection.bind(this); | |
this.mapVirtuals = this.mapVirtuals.bind(this); | |
this.virtual = this.virtual.bind(this); | |
this.addHook = this.addHook.bind(this); | |
this.map = this.map.bind(this); | |
this.create = this.create.bind(this); | |
this._getCacheKey = this._getCacheKey.bind(this); | |
this._deleteCache = this._deleteCache.bind(this); | |
this._deleteCacheForType = this._deleteCacheForType.bind(this); | |
this._getFromCacheOrSet = this._getFromCacheOrSet.bind(this); | |
this._find = this._find.bind(this); | |
this.find = this.find.bind(this); | |
this.findOne = this.findOne.bind(this); | |
this.findById = this.findById.bind(this); | |
this.findAll = this.findAll.bind(this); | |
this._updateItem = this._updateItem.bind(this); | |
this.findOneAndUpdate = this.findOneAndUpdate.bind(this); | |
this.findOneOrCreate = this.findOneOrCreate.bind(this); | |
this.update = this.update.bind(this); | |
this.updateOne = this.updateOne.bind(this); | |
this.updateById = this.updateById.bind(this); | |
this.remove = this.remove.bind(this); | |
this.removeById = this.removeById.bind(this); | |
// register the model | |
Schema._registeredModels[this.model] = this; | |
} | |
// returns a promise that resolves when the schema is ready to take operations | |
ready() { | |
return new Promise(resolve => { | |
const checkAsync = () => setTimeout(() => { | |
if (this._container) { | |
return resolve(true); | |
} | |
checkAsync(); | |
}, 50); | |
checkAsync(); | |
}); | |
} | |
deleteCollection() { | |
return this._container.delete(); | |
} | |
createCollection() { | |
return database.containers.createIfNotExists({ | |
id: this.collection, | |
partitionKey: { paths: [`/${this.partitionKey}`], kind: 'Hash' } | |
}).then(({ container }) => { | |
this._container = container; | |
}).catch(err => { | |
console.error(err); | |
throw new DbError(`Couldn't create container ${this.collection}: ${err}`); | |
});; | |
} | |
async recreateCollection() { | |
await this.deleteCollection(); | |
await this.createCollection(); | |
} | |
static castVal(type, val, path = 'unknown') { | |
if (type === Number) { | |
return cast.number(val); | |
} | |
if (type === String) { | |
return cast.string(val, path); | |
} | |
if (type === Boolean) { | |
return cast.boolean(val, path); | |
} | |
if (type === Date) { | |
return cast.date(val); | |
} | |
if (type === Array) { | |
return _.toArray(val); | |
} | |
if (type === Object || type === Schema.Types.Mixed || type === Schema.Types.Reference) { | |
return val; | |
} | |
return val; | |
} | |
mapVirtuals(obj) { | |
this.virtuals.forEach(virtual => { | |
obj[virtual.field] = virtual.getter.call(obj); | |
}); | |
} | |
virtual(field) { | |
return { | |
get: getter => this.virtuals.push({ field, getter }) | |
}; | |
} | |
addHook(type, hook) { | |
this.hooks.push({ type, hook }); | |
} | |
async _runHooks(type, object) { | |
await Promise.all(this.hooks | |
.filter(hook => hook.type === type) | |
.map(hook => hook.hook(object))); | |
return object; | |
} | |
map(obj, isRead = true) { | |
if (!obj) return obj; | |
if (Array.isArray(obj)) { | |
return obj.map(el => this.map(el, isRead)); | |
} | |
const ret = Object.keys(this.schema).reduce((xs, x) => { | |
let val = obj[x]; | |
if (!isRead && x in autoAddedFields) { | |
// if it's not a read and x is an auto-added field, just propagate that | |
if (val !== undefined) { | |
xs[x] = val; | |
} | |
return xs; | |
} | |
const field = this.schema[x]; | |
if (val == null) { | |
if (typeof field === 'object') { | |
if ('default' in field) { | |
val = typeof field.default === 'function' ? field.default() : field.default; | |
} else if ('required' in field) { | |
throw new DbError(`Value '${val}' in path '${x}' is required`); | |
} | |
} | |
xs[x] = null; // don't leave values undefined, it's problematic | |
return xs; | |
} | |
if (typeof field === 'function') { | |
val = Schema.castVal(field, val, x); | |
} else { | |
val = Schema.castVal(field.type, val, x); | |
if (!isRead) { | |
// if it's a read operation, only map defaults | |
if ('enum' in field && !field.enum.includes(val)) { | |
throw new DbError(`Value '${val}' is not a valid enum value, possible values are: ${field.enum}`); | |
} | |
if (field.type === String) { | |
if ('trim' in field) { | |
val = val.trim(); | |
} | |
if ('uppercase' in field) { | |
val = val.toLocaleUpperCase(); | |
} | |
if ('minlength' in field && val.length < field.minlength) { | |
throw new DbError(`Value '${val}' in path '${x}' is too short, minlength: ${field.min}`); | |
} | |
if ('maxlength' in field && val.length > field.maxlength) { | |
throw new DbError(`Value '${val}' in path '${x}' is too long, maxlength: ${field.min}`); | |
} | |
} | |
if (field.type === Number) { | |
if ('min' in field && val < field.min) { | |
throw new DbError(`Value '${val}' in path '${x}' is too small, min: ${field.min}`); | |
} | |
if ('max' in field && val > field.max) { | |
throw new DbError(`Value '${val}' in path '${x}' is too big, max: ${field.max}`); | |
} | |
} | |
} | |
} | |
xs[x] = val; | |
return xs; | |
}, {}); | |
if (isRead) { | |
// map the virtual fields if it's a read operation | |
this.mapVirtuals(ret); | |
} | |
return ret; | |
} | |
async create(model, options = ({ deleteCache: true, runHooks: true })) { | |
const now = new Date(); | |
if (this.timestamps) { | |
if (!('createdAt' in model)) { | |
model.createdAt = now; | |
} | |
if (!('updatedAt' in model)) { | |
model.updatedAt = now; | |
} | |
} | |
// map defaults etc | |
const mappedModel = this.map(model, false); | |
if (options.runHooks) { | |
await this._runHooks('beforeCreate', mappedModel); | |
} | |
if (options.deleteCache) { | |
this._deleteCacheForType('sql'); | |
} | |
if (!mappedModel.id) { | |
// see: https://github.com/Azure/azure-cosmos-js/issues/241 | |
mappedModel.id = uuid.v4(); | |
} | |
const { body } = await this._container.items.create(mappedModel); | |
return this.map(body, true); | |
} | |
_getCacheKey(type, label) { | |
return `${this.model}:${type}:${label}`; | |
} | |
_deleteCache(type, label) { | |
redis.del(this._getCacheKey(type, label)).catch(err => { | |
logger.error(`Redis - Error while deleting cache for "id" "${label}": ${err}`); | |
}); | |
} | |
_deleteCacheForType(type) { | |
redis.keys(`${this.model}:${type}:*`).then(keys => keys.map(key => redis.del(key))).catch(err => { | |
logger.error(`Redis - Error while deleting cache for all keys under "${type}": ${err}`); | |
}); | |
} | |
async _getFromCacheOrSet(type, label, setter) { | |
const cacheKey = this._getCacheKey(type, label); | |
const cacheResult = await redis.get(cacheKey); | |
if (cacheResult) { | |
logger.info(`Redis - Key: ${cacheKey}, Cache hit: ${cacheResult.slice(0, 10)}...`); | |
return this.map(JSON.parse(cacheResult), true); | |
} | |
logger.info(`Redis - Key: ${cacheKey}, Cache miss`); | |
const result = await setter(); | |
redis.setex(cacheKey, cacheTtl, JSON.stringify(result)).catch(err => { | |
logger.error(`Redis - Error while setting cache for "${type}" "${label}": ${err}`); | |
}); | |
return result; | |
} | |
async _find(query, options = {}) { | |
const sql = sqlBuilder.sql({ | |
type: 'select', | |
table: this.collection, | |
where: query, | |
...options, | |
}); | |
const sqlStr = sql.toString(); | |
const params = sql.values.map((value, idx) => ({ name: `@p${idx + 1}`, value })); | |
const cacheLabel = sqlStr.replace(/@(p\d*)/g, (...args) => `'${params.find(v => v.name === args[0]).value}'`); | |
// get the docs from cache or db | |
const mappedDocs = await this._getFromCacheOrSet('sql', cacheLabel, async () => { | |
const querySpec = { | |
query: sqlStr, | |
parameters: params, | |
}; | |
const { result: items } = await this._container.items.query(querySpec, { | |
enableCrossPartitionQuery: true, | |
}).toArray(); | |
return items; | |
}).then(this.map); | |
// make the populations, if any requested | |
if (options.populate && [].concat(options.populate).length > 0) { | |
await Promise.all( | |
[].concat(options.populate).map(async field => { | |
if (!this.schema[field]) { | |
throw new DbError(`${field} does not exist in the schema`); | |
} | |
if (!this.schema[field].ref) { | |
throw new DbError(`${field} does not have a 'ref' field`); | |
} | |
if (!Schema._registeredModels[this.schema[field].ref]) { | |
throw new DbError(`${this.schema[field].ref} is not a registered model`); | |
} | |
const model = Schema._registeredModels[this.schema[field].ref]; | |
const ids = _.uniq(_.compact(mappedDocs.map(doc => doc[field]))); | |
const docs = await Promise.all( | |
ids.map(throat(20, id => model.findById(id).catch(() => null))), | |
); | |
const idToDoc = ids.reduce((xs, x, idx) => { | |
xs[x] = model.map(docs[idx]); | |
return xs; | |
}, {}); | |
mappedDocs.forEach(mappedDoc => { | |
if (mappedDoc[field] && idToDoc[mappedDoc[field]]) { | |
mappedDoc[field] = idToDoc[mappedDoc[field]]; | |
} else { | |
mappedDoc[field] = null; | |
} | |
}); | |
}), | |
); | |
} | |
return mappedDocs; | |
} | |
find(query, options) { | |
if ((!query || Object.keys(query).length === 0) | |
&& (!options || Object.keys(options).length === 0)) { | |
// minor optimization | |
return this.findAll(); | |
} | |
return this._find(query, options); | |
} | |
async findOne(query, options) { | |
options = options || {}; | |
let items = []; | |
try { | |
items = await this._find(query, { ...options, limit: 1 }); | |
} catch (ignored) { | |
// single-partition query failed, try cross-partition | |
items = await this._find(query, options); | |
} | |
return items[0]; | |
} | |
findById(id) { | |
return this.findOne({ id }); | |
} | |
async findAll() { | |
return this._getFromCacheOrSet('sql', `select * from ${this.collection}`, async () => { | |
const { result: items } = await this._container.items.readAll().toArray(); | |
return items; | |
}).then(this.map); | |
} | |
async _updateItem(item, fields) { | |
this._deleteCacheForType('sql'); | |
const newItem = { ...item }; | |
Object.entries(fields).forEach(([key, val]) => { | |
_.set(newItem, key, val); | |
}); | |
newItem.updatedAt = new Date(); | |
const model = this.map(newItem, false); | |
const { body } = await this._container.item(item.id, item[this.partitionKey]).replace(model); | |
return this.map(body, true); | |
} | |
async findOneAndUpdate(query, fields, opts) { | |
try { | |
const item = await this.findOne(query); | |
return this._updateItem(item, fields); | |
} catch (err) { | |
if (!(err instanceof DbError)) { | |
throw err; | |
} | |
if (opts.upsert) { | |
return this.create({ ...query, ...fields }); | |
} | |
throw new DbError(`No items found for query: ${JSON.stringify(query)}`); | |
} | |
} | |
async findOneOrCreate(doc) { | |
const one = await this.findOne(doc); | |
return one || this.create(doc); | |
} | |
async update(query, fields) { | |
const items = await this.find(query); | |
return Promise.all(items.map(item => this._updateItem(item, fields))); | |
} | |
async updateOne(query, fields) { | |
const item = await this.findOne(query); | |
return this._updateItem(item, fields); | |
} | |
async updateById(id, fields) { | |
const item = await this.findById(id); | |
return this._updateItem(item, fields); | |
} | |
async remove(query) { | |
const items = await this.find(query); | |
return Promise.all(items.map(item => this.removeById(item.id))); | |
} | |
async removeById(id) { | |
const item = await this.findById(id); | |
this._deleteCacheForType('sql'); | |
return this._container.item(item.id, item[this.partitionKey]).delete(); | |
} | |
} | |
Schema._registeredModels = {} | |
Schema.Types = { | |
Mixed: i => i, | |
Reference: i => i, | |
} | |
module.exports = Schema; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const CastError = require('../error/castError'); | |
module.exports = (value, path) => { | |
if (value == null) { | |
return value; | |
} | |
if (module.exports.convertToTrue.has(value)) { | |
return true; | |
} | |
if (module.exports.convertToFalse.has(value)) { | |
return false; | |
} | |
throw new CastError('boolean', value, path); | |
}; | |
module.exports.convertToTrue = new Set([true, 'true', 'True', 1, '1', 'yes', 'Yes']); | |
module.exports.convertToFalse = new Set([false, 'false', 'False', 0, '0', 'no', 'No']); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const assert = require('assert'); | |
module.exports = (value) => { | |
if (value == null || value === '') { | |
return null; | |
} | |
if (value instanceof Date) { | |
assert.ok(!isNaN(value.valueOf())); | |
return value; | |
} | |
let date; | |
assert.ok(typeof value !== 'boolean'); | |
if (value instanceof Number || typeof value === 'number') { | |
date = new Date(value); | |
} else if (typeof value === 'string' && !isNaN(Number(value)) && (Number(value) >= 275761 || Number(value) < -271820)) { | |
// string representation of milliseconds take this path | |
date = new Date(Number(value)); | |
} else if (typeof value.valueOf === 'function') { | |
// support for moment.js. This is also the path strings will take because | |
// strings have a `valueOf()` | |
date = new Date(value.valueOf()); | |
} else { | |
// fallback | |
date = new Date(value); | |
} | |
if (!isNaN(date.valueOf())) { | |
return date; | |
} | |
return value; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const date = require('./date'); | |
const boolean = require('./boolean'); | |
const number = require('./number'); | |
const string = require('./string'); | |
module.exports = { | |
date, | |
boolean, | |
number, | |
string, | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const assert = require('assert'); | |
module.exports = (val) => { | |
if (isNaN(val)) { | |
return null; | |
} | |
if (val == null) { | |
return val; | |
} | |
if (val === '') { | |
return null; | |
} | |
if (typeof val === 'string' || typeof val === 'boolean') { | |
val = Number(val); | |
} | |
if (isNaN(val)) { | |
return null; | |
} | |
if (val instanceof Number) { | |
return val; | |
} | |
if (typeof val === 'number') { | |
return val; | |
} | |
if (!Array.isArray(val) && typeof val.valueOf === 'function') { | |
return Number(val.valueOf()); | |
} | |
// eslint-disable-next-line eqeqeq | |
if (val.toString && !Array.isArray(val) && val.toString() == Number(val)) { | |
// eslint-disable-next-line no-new-wrappers | |
return new Number(val); | |
} | |
assert.ok(false); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const CastError = require('../error/castError'); | |
module.exports = function (value, path) { | |
// If null or undefined | |
if (value == null) { | |
return value; | |
} | |
// handle documents being passed | |
if (value.id && typeof value.id === 'string') { | |
return value.id; | |
} | |
// Re: gh-647 and gh-3030, we're ok with casting using `toString()` | |
// **unless** its the default Object.toString, because "[object Object]" | |
// doesn't really qualify as useful data | |
if (value.toString | |
&& value.toString !== Object.prototype.toString | |
&& !Array.isArray(value)) { | |
return value.toString(); | |
} | |
throw new CastError('string', value, path); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const util = require('util'); | |
const DbError = require('./dbError'); | |
function CastError(type, value, path, reason) { | |
let stringValue = util.inspect(value); | |
stringValue = stringValue.replace(/^'/, '"').replace(/'$/, '"'); | |
if (stringValue.charAt(0) !== '"') { | |
stringValue = `"${stringValue}"`; | |
} | |
DbError.call(this, `Cast to ${type} failed for value ${ | |
stringValue} at path "${path}"`); | |
this.name = 'CastError'; | |
if (Error.captureStackTrace) { | |
Error.captureStackTrace(this); | |
} else { | |
this.stack = new Error().stack; | |
} | |
this.stringValue = stringValue; | |
this.kind = type; | |
this.value = value; | |
this.path = path; | |
this.reason = reason; | |
} | |
CastError.prototype = Object.create(DbError.prototype); | |
CastError.prototype.constructor = DbError; | |
CastError.prototype.setModel = function (model) { | |
this.model = model; | |
this.message = `Cast to ${this.kind} failed for value ${ | |
this.stringValue} at path "${this.path}"` + ` for model "${ | |
model.modelName}"`; | |
}; | |
module.exports = CastError; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function DbError(msg) { | |
Error.call(this); | |
if (Error.captureStackTrace) { | |
Error.captureStackTrace(this); | |
} else { | |
this.stack = new Error().stack; | |
} | |
this.message = msg; | |
this.name = 'DbError'; | |
} | |
DbError.prototype = Object.create(Error.prototype); | |
DbError.prototype.constructor = Error; | |
module.exports = DbError; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment