Skip to content

Instantly share code, notes, and snippets.

@davalapar
Created February 14, 2019 06:32
Show Gist options
  • Save davalapar/7ad493989e0d72ab66a4d070397e9172 to your computer and use it in GitHub Desktop.
Save davalapar/7ad493989e0d72ab66a4d070397e9172 to your computer and use it in GitHub Desktop.
ds2
class Dreadlocks {
constructor() {
this.mapLock = new Map();
this.arrayQueue = [];
}
lock(items) {
const dread = this;
return new Promise((resolve) => {
if (items.every(item => dread.mapLock.has(item) === false) === true) {
items.forEach(item => dread.mapLock.set(item));
resolve();
} else {
dread.arrayQueue.push({
items,
resolve,
});
}
});
}
release(items) {
const dread = this;
return new Promise((resolve) => {
items.forEach(item => dread.mapLock.delete(item));
for (let i = 0; i < dread.arrayQueue.length; i += 1) {
if (
dread.arrayQueue[i].items.every(
item => dread.mapLock.has(item) === false,
) === true
) {
dread.arrayQueue[i].items.forEach(item => dread.mapLock.set(item));
dread.arrayQueue[i].resolve();
dread.arrayQueue.splice(i, 1);
}
}
resolve();
});
}
get size() {
return this.mapLock.size;
}
get length() {
return this.arrayQueue.length;
}
}
export default Dreadlocks;
import { Datastore } from '@google-cloud/datastore';
import uuid from 'uuid-random';
import circular from 'circular-json';
import DEBUG from 'debug';
import msgpack5 from 'msgpack5';
import pako from 'pako';
import prettyBytes from 'pretty-bytes';
import _ from 'lodash';
import Dreadlock from './dreadlocks2';
import { toSHA256Hex } from '../../../Uint8ArrayTools';
import LRU from './lru';
const msgpack = msgpack5();
const wrap = something => pako.deflate(msgpack.encode(something), { level: 9, memLevel: 9 });
const unwrap = something => msgpack.decode(pako.inflate(something));
const DS2 = (datastoreOpts, cacheOpts) => {
if (Object.prototype.isPrototypeOf.call(Object.prototype, datastoreOpts) === false) {
throw new TypeError('datastoreOpts must be an object');
}
if (Object.prototype.isPrototypeOf.call(Object.prototype, cacheOpts) === false) {
throw new TypeError('cacheOpts must be an object');
}
if (typeof cacheOpts.globalMaxSize !== 'number' || parseInt(cacheOpts.globalMaxSize, 10) < 1) {
throw new TypeError('globalMaxSize must be a number greater than 0');
}
if (typeof cacheOpts.namespacedMaxSize !== 'number' || parseInt(cacheOpts.namespacedMaxSize, 10) < 1) {
throw new TypeError('namespacedMaxSize must be a number greater than 0');
}
const DatastoreInstance = new Datastore(datastoreOpts);
const lruGlobal = new LRU(cacheOpts.globalMaxSize);
const debug = DEBUG('DS2');
debug.enabled = DEV_MODE;
const Dread = new Dreadlock();
class Transaction {
constructor() {
this.transaction = DatastoreInstance.transaction();
this.keyPairs = {};
}
keys(keys) {
this.keyPairs = Object.keys(keys).map(key => [key, keys[key]]);
return this;
}
exec(executorFn) {
const { transaction, keyPairs } = this;
if (_.isEmpty(keyPairs) === true) {
return Promise.reject(
new TypeError('transaction missing keys, cannot proceed.'),
);
}
/**
*
* We hash using the following unique data:
* - Entity's `namespace`
* - Entity's `kind`
* - Entity's `name` or `id`
*/
const keySet = keyPairs.map(keyPair => toSHA256Hex(
''.concat(
String(keyPair[1].namespace),
String(keyPair[1].kind),
String(keyPair[1].name || keyPair[1].id),
),
));
const promise = Dread.lock(keySet)
// Initialize transaction:
.then(() => transaction.run())
// Gather keys for transaction:
.then(() => Promise.all(keyPairs.map(keyPair => transaction.get(keyPair[1]))))
// Pass what we got to our executor function:
.then((results) => {
debug({ results });
// Map our entities to a new object:
const entities = {};
keyPairs.forEach((keyPair, keyPairIndex) => {
entities[_.first(keyPair)] = _.first(results[keyPairIndex]);
});
let result;
let error;
try {
// Retrieve modified object from executor function:
result = executorFn(entities);
} catch (e) {
// Retrieve any errors from executor function:
error = e;
}
// Check for exceptions:
if (error !== undefined) {
return Promise.reject(error);
}
// Ensure our executor function returned a promise:
if (Object.prototype.isPrototypeOf.call(Promise.prototype, result) === false) {
return Promise.reject(
new TypeError('Expecting promise from executor function.'),
);
}
return result;
})
// Proceed if not rejected by developer:
.then((entities) => {
// Ensure our executor function passed a valid object:
if (Object.prototype.isPrototypeOf.call(Object.prototype, entities) === false) {
return Promise.reject(
new TypeError('Expecting object from executor function promise.'),
);
}
// Ensure it returned a non-empty object:
if (_.isEmpty(entities) === true) {
return Promise.reject(
new Error('Expecting non-empty object from executor function promise.'),
);
}
// Save our changes:
const updateArray = keyPairs.map(keyPair => ({
key: keyPair[1],
data: entities[keyPair[0]],
}));
transaction.save(updateArray);
// Commit our changes:
return transaction.commit()
// Handle errors in committing changes:
.catch(error => transaction.rollback().then(() => Promise.reject(error)));
})
// Release locks on successful commit:
.then(() => Dread.release(keySet))
/**
* Final error handler:
* - If exception was found from executor function
* - If rejected by developer from executor function
* - If datastore encountered an error in transaction commit
*/
.catch(error => Dread.release(keySet).then(() => Promise.reject(error)));
return promise;
}
}
class Query {
constructor(kind, endCursor) {
let query = DatastoreInstance.createQuery(kind);
if (_.isEmpty(endCursor) === false) {
query = query.start(endCursor);
}
this.metaKind = kind;
this.metaQuery = query;
this.metaDescription = '';
this.debug = DEBUG('@'.concat(kind));
this.debug.enabled = DEV_MODE;
}
describe(description) {
this.metaDescription = description;
return this;
}
ascend(col) {
this.metaQuery = this.metaQuery.order(col);
return this;
}
descend(col) {
this.metaQuery = this.metaQuery.order(col, {
descending: true,
});
return this;
}
offset(val) {
this.metaQuery = this.metaQuery.offset(val);
return this;
}
select(fields) {
this.metaQuery = this.metaQuery.select(fields);
return this;
}
filter(col, operator, val) {
this.metaQuery = this.metaQuery.filter(col, operator, val);
return this;
}
limit(limit) {
this.metaQuery = this.metaQuery.limit(limit);
return this;
}
runQuery(cache) {
const self = this;
const query = this.metaQuery;
const description = this.metaDescription;
const hash = toSHA256Hex(circular.stringify(query));
// get part:
if (description !== undefined) self.debug(description);
if (cache !== undefined) {
self.debug('(cached query)');
if (typeof cache === 'string') {
if (lruGlobal.has(cache) === true) {
const namespacedCache = lruGlobal.get(cache);
self.debug('fetching', hash.substr(0, 8), 'in', cache);
if (namespacedCache.has(hash) === true) {
self.debug('', hash.substr(0, 8), 'found in', cache);
const unwrappedResult = unwrap(namespacedCache.get(hash));
return Promise.resolve(unwrappedResult);
}
}
}
if (typeof cache === 'boolean' && lruGlobal.has(hash) === true) {
self.debug('fetching', hash.substr(0, 8));
const unwrappedResult = unwrap(lruGlobal.get(hash));
return Promise.resolve(unwrappedResult);
}
} else {
self.debug('(non-cached query)');
}
return DatastoreInstance
.runQuery(this.metaQuery)
.then((results) => {
const entities = results[0];
const keys = entities.map(entity => entity[DatastoreInstance.KEY]);
const info = results[1];
const endCursor = (
info.moreResults !== DatastoreInstance.NO_MORE_RESULTS
? info.endCursor
: null
);
const endResult = {
entities, keys, endCursor, hash,
};
// set part
if (cache !== undefined) {
const wrappedResult = wrap(endResult);
self.debug('size', prettyBytes(wrappedResult.byteLength));
if (typeof cache === 'string') {
let namespacedCache;
if (lruGlobal.has(cache) === true) {
self.debug('existing namespace:', cache);
namespacedCache = lruGlobal.get(cache);
} else {
self.debug('creating namespace:', cache);
namespacedCache = new LRU(cacheOpts.namespacedMaxSize);
lruGlobal.set(cache, namespacedCache);
}
self.debug('setting', hash.substr(0, 8), 'in', cache);
namespacedCache.set(hash, wrappedResult);
}
if (typeof cache === 'boolean') {
self.debug('setting', hash.substr(0, 8));
lruGlobal.set(hash, wrappedResult);
}
}
return Promise.resolve(endResult);
});
}
}
// https://gist.github.com/bugventure/f71337e3927c34132b9a#gistcomment-2238943
// const regexUUIDv4 = /^[A-F\d]{8}-[A-F\d]{4}-4[A-F\d]{3}-[89AB][A-F\d]{3}-[A-F\d]{12}$/i;
// const isValidUUIDv4 = parameter => regexUUIDv4.test(parameter);
class Entity {
/**
* Entity.fromKey:
* @param {Object|String} param1 - key or kind
* @param {String|Number} param2 - none or name-id
* @returns {Entity}
*/
fromKey(...parameters) {
switch (parameters.length) {
case 1: {
if (Object.prototype.isPrototypeOf.call(Object.prototype, parameters[0]) === false) {
throw TypeError('Entity.fromKey(x): Expecting x as typeof \'object\'.');
}
[this.key] = parameters;
this.kind = parameters[0].kind;
return this;
}
case 2: {
if (typeof parameters[0] !== 'string') {
throw TypeError('Entity.fromKey(x,y): Expecting x as typeof \'string\'.');
}
if (typeof parameters[1] !== 'string') {
throw TypeError('Entity.fromKey(x,y): Expecting y as typeof \'string\'');
}
const key = DatastoreInstance.key([
parameters[0],
parameters[1],
]);
this.key = key;
this.kind = key.kind;
return this;
}
case 3: {
// console.log({ parameters });
if (typeof parameters[0] !== 'string') {
throw TypeError('Entity.fromKey(x,y): Expecting x as typeof \'string\'.');
}
if (parameters[2] !== String && parameters[2] !== Number) {
throw TypeError('Entity.fromKey(x,y,z): Expecting z as \'String\' class or \'Number\' class');
}
const key = DatastoreInstance.key([
parameters[0],
parameters[2](parameters[1]),
]);
this.key = key;
this.kind = key.kind;
return this;
}
default:
throw TypeError('Entity.fromKey(): Unexpected parameter length');
}
}
/**
* Entity.fromUUID:
* @param {String} kind
* @returns {Promise}
*/
fromUUID(kind) {
if (typeof kind !== 'string' || Boolean(kind) === false) {
return Promise.reject(
new TypeError('Entity.fromUUID(kind): Expecting kind as non-empty \'string\'.'),
);
}
if (Boolean(this.key) === true) {
return Promise.reject(
new TypeError('Entity.fromUUID(kind): Entity.key already exists.'),
);
}
const GenerateRandomKey = () => {
this.key = DatastoreInstance.key([kind, uuid()]);
if (ENVIRONMENT === 'development') {
return Promise.resolve();
}
return new Transaction()
.keys({ temp: this.key })
.exec(({ temp }) => (temp === undefined ? Promise.resolve({ temp: {} }) : Promise.reject())) // eslint-disable-line
.catch(() => GenerateRandomKey());
};
return GenerateRandomKey();
}
/**
* Entity.fromFilters:
* @param {String} kind
* @returns {Function}
* @param {...Array} filters
* @returns {Promise}
*/
fromFilters(kind) {
if (typeof kind !== 'string' || Boolean(kind) === false) {
return Promise.reject(
new TypeError('Entity.fromFilters(kind): Expecting kind as non-empty \'string\'.'),
);
}
if (Boolean(this.key) === true) {
return Promise.reject(
new TypeError('Entity.fromFilters(kind): Entity.key already exists.'),
);
}
return (...filters) => {
const query = new Query(kind).limit(1);
filters.forEach((filter) => {
if (Array.isArray(filter) === false) {
return Promise.reject(
new TypeError('Entity.fromFilters(kind)(...filters): Non-array filter encountered.'),
);
}
if (filter.length !== 3) {
return Promise.reject(
new TypeError('Entity.fromFilters(kind)(...filters): Filter must have 3 items.'),
);
}
return query.filter(filter[0], filter[1], filter[2]);
});
return query.runQuery()
.then(({ entities, keys }) => {
if (_.isEmpty(entities) === true) {
return Promise.reject(
new TypeError('Entity.fromFilters(kind)(...filters): Entity not found.'),
);
}
this.key = _.first(keys);
return Promise.resolve();
});
};
}
/**
* Entity.upsert:
* @param {Object} upsertData
* @returns {Promise}
*/
upsert(upsertData) {
if (Object.prototype.isPrototypeOf.call(Object.prototype, upsertData) === false) {
throw TypeError('Entity.upsert(upsertData): Expecting upsertData as typeof \'object\'.');
}
if (Boolean(this.key) === false) {
return Promise.reject(
new TypeError('Entity.upsert(upsertData): Entity.key missing.'),
);
}
if (DEV_MODE === true) {
debug('DEV_MODE UPSERT @ ', this.key.kind, this.key.name || this.key.id);
debug(upsertData);
return Promise.resolve();
}
return new Transaction()
.keys({ temp: this.key })
.exec(() => Promise.resolve({ temp: upsertData }));
}
/**
* Entity.merge:
* @param {Object} mergeData
* @returns {Promise}
*/
merge(mergeData) {
if (Object.prototype.isPrototypeOf.call(Object.prototype, mergeData) === false) {
throw TypeError('Entity.merge(mergeData): Expecting mergeData as typeof \'object\'.');
}
if (Boolean(this.key) === false) {
return Promise.reject(
new TypeError('Entity.merge(mergeData): Entity.key missing.'),
);
}
const merged = {};
return new Transaction()
.keys({ temp: this.key })
.exec(({ temp }) => {
Object.assign(merged, temp);
Object.assign(merged, mergeData);
if (DEV_MODE === true) {
debug('DEV_MODE MERGE @ ', this.key.kind, this.key.name || this.key.id);
debug(mergeData);
return Promise.resolve({ temp });
}
return Promise.resolve({ temp: merged });
})
.then(() => Promise.resolve(merged));
}
/**
* Entity.delete:
* @returns {Promise}
*/
delete() {
if (this.key === undefined) {
return Promise.reject(
new TypeError('Entity.delete(): Entity.key missing.'),
);
}
if (DEV_MODE === true) {
debug('DEV_MODE DELETE @ ', this.key.kind, this.key.name || this.key.id);
return Promise.resolve();
}
return DatastoreInstance.delete(this.key);
}
/**
* Entity.reveal:
* @returns {Promise}
*/
reveal() {
const self = this;
if (this.key === undefined) {
return Promise.reject(
new TypeError('Entity.reveal(): Entity.key missing.'),
);
}
return new Promise((resolve) => {
new Transaction()
.keys({ temp: this.key })
.exec(({ temp }) => {
self.data = temp;
const handler = new Promise((resolve) => { // eslint-disable-line
const concealer = () => {
self.data = undefined;
self.conceal = undefined;
resolve({ temp });
return Promise.resolve();
};
self.conceal = concealer;
});
resolve();
return handler;
});
});
}
async fetch() {
if (this.key === undefined) {
throw TypeError('Entity.delete(): Entity.key missing.');
}
const result = await DatastoreInstance.get(this.key);
return _.first(result);
}
}
return {
Transaction,
Entity,
Query,
lruGlobal,
KEY: DatastoreInstance.KEY,
};
};
export default DS2;
class LRU {
constructor(max) {
if (typeof max === 'number' && parseInt(max, 10) > 0) {
this.max = parseInt(max, 10);
} else {
this.max = 10;
}
this.cache = new Map();
}
get(key) {
const { cache } = this;
if (cache.has(key) === true) {
const value = cache.get(key);
cache.delete(key);
cache.set(key, value);
return value;
}
return undefined;
}
set(key, value) {
const { cache, max } = this;
if (cache.has(key) === true) {
cache.delete(key);
}
cache.set(key, value);
if (cache.size === max) {
const firstKey = cache.keys().next().value;
cache.delete(firstKey);
}
}
has(key) {
const { cache } = this;
return cache.has(key);
}
delete(key) {
const { cache } = this;
return cache.delete(key);
}
clear() {
this.cache = new Map();
}
}
export default LRU;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment