Skip to content

Instantly share code, notes, and snippets.

@freeart
Created June 7, 2017 17:56
Show Gist options
  • Save freeart/d26bd175c666de26d51d05e1fb8ec6df to your computer and use it in GitHub Desktop.
Save freeart/d26bd175c666de26d51d05e1fb8ec6df to your computer and use it in GitHub Desktop.
const async = require('async'),
vm = require("vm"),
fs = require("fs"),
moment = require('moment'),
changeCase = require('change-case');
class Contract {
constructor(cb, scope) {
this.modules;
this.ready = false;
this.core = scope;
this.contracts = {};
this.cache = {};
this.core.on("bind", (msg) => {
this.onBind(msg);
});
this.queue = async.queue((data, cb) => {
async.retry({times: 3, interval: 5000}, (cb) => {
this.modules.bus.send([
data.containerId,
data.contractId,
data.conditions,
data.timestamp
], "mapit2.conditions", cb)
}, (err) => {
if (err) {
this.modules.logger.error(data, err)
}
cb(err);
})
}, 10);
/*async.forever((next) => {
setTimeout(() => {
this.modules.logger.info("mapit2.conditions", this.queue.length())
this.modules.logger.info("storageUpdate", this.storageUpdate.length())
this.modules.logger.info("storageInsert", this.storageInsert.length())
next();
}, 5000);
})*/
this.storageUpdate = async.queue((data, cb) => {
let record = {
Uuid: new String(data.contractId),
ContainerId: data.containerId,
Status: new String(data.conditions)
};
if (data.tenant) {
record.Tenant = data.tenant
}
if (data.archive) {
record.Archive = data.archive
}
this.modules.sql.merge("Contracts", "Uuid", record, cb)
}, 10);
this.storageInsert = async.queue((data, cb) => {
let record = {
Uuid: new String(data.contractId),
ContainerId: data.containerId,
Status: new String(data.conditions)
};
if (data.tenant) {
record.Tenant = data.tenant
}
if (data.archive) {
record.Archive = data.archive
}
this.modules.sql.insert("Contracts", "Uuid", record, cb)
}, 10);
cb(null, this);
}
parse(payload) {
//["location", data.device.containerId, data.device.id, [data.event.point.timestamp, data.event.point.lat, data.event.point.long, data.event.point.prec, data.event.point.sensor, data.event.point.speed]]
//["session", payload.containerId, payload.deviceId, [payload.begin, payload.end, payload.distance]]
if (!Array.isArray(payload) || typeof payload[0] !== "string") {
return null;
}
switch (payload[0]) {
case "create":
if (typeof payload[1] === "string" &&
typeof payload[2] === "string" &&
typeof payload[3] === "number" &&
typeof payload[4] === "string"
) {
return {
action: payload[0],
containerId: payload[1],
contractId: payload[2],
tenant: payload[3],
script: payload[4],
argv: payload[5]
}
} else {
return null;
}
break;
case "destroy":
if (typeof payload[1] === "string" &&
typeof payload[2] === "string"
) {
return {
action: payload[0],
containerId: payload[1],
contractId: payload[2]
}
} else {
return null;
}
break;
case "debug":
if (typeof payload[1] === "string" &&
typeof payload[2] === "string"
) {
return {
action: payload[0],
containerId: payload[1],
contractId: payload[2]
}
} else {
return null;
}
break;
default:
if (typeof payload[1] === "string" && payload.length == 3) {
return {
action: payload[0],
containerId: payload[1],
argv: payload[2]
}
} else {
return null;
}
break;
}
}
__createContract(script, argv) {
const cache = this.cache[script] = this.cache[script] || {};
cache.code = cache.code || (new vm.Script(fs.readFileSync(`./contracts/${script}.js`)));
cache.context = cache.context || (new vm.createContext({geolib: require('geolib')}));
cache.App = cache.App || cache.code.runInContext(cache.context);
let app = new cache.App();
Object.defineProperty(app, "script", {
value: script,
writable: false,
enumerable: true
});
Object.defineProperty(app, "argv", {
value: argv || {},
writable: false,
enumerable: true
});
return app;
}
__listen() {
this.modules.bus.listen("mapit2.contracts", (err, msg) => {
if (err) {
if (err.toString() == "Error: Socket closed abruptly during opening handshake") {
this.modules.logger.error("__listen restart", err)
setTimeout(() => {
this.modules.logger.error("__listen restarted")
this.__listen();
}, 5000)
}
return this.modules.logger.error("__listen", err)
}
let payload;
let contractEvent;
try {
payload = JSON.parse(msg.payload);
contractEvent = this.parse(payload);
}
catch (e) {
msg.release();
return this.modules.logger.error(msg.payload, e.toString())
}
if (!contractEvent) {
this.modules.logger.info(msg.payload, "wrong signature")
return msg.release();
}
switch (contractEvent.action) {
case 'create':
this.contracts[contractEvent.containerId] = this.contracts[contractEvent.containerId] || {};
if (Object.keys(this.contracts[contractEvent.containerId]).length > 10) {
return msg.release();
}
let app = this.__createContract(contractEvent.script, contractEvent.argv);
app.main && app.main();
if (app.execute()) {
this.queue.push({
containerId: contractEvent.containerId,
contractId: contractEvent.contractId,
conditions: JSON.stringify(app),
timestamp: moment().utc().unix()
})
this.contracts[contractEvent.containerId][contractEvent.contractId] = app;
this.modules.logger.info("first verify ok", contractEvent.containerId, app);
} else {
this.contracts[contractEvent.containerId][contractEvent.contractId] = app;
this.modules.logger.info("first verify fail", contractEvent.containerId, app);
}
this.storageInsert.push({
tenant: contractEvent.tenant,
containerId: contractEvent.containerId,
contractId: contractEvent.contractId,
conditions: JSON.stringify(app)
});
msg.release();
break;
case 'debug':
if (!this.contracts[contractEvent.containerId]) {
return msg.release();
}
let info = this.contracts[contractEvent.containerId][contractEvent.contractId];
this.queue.push({
containerId: contractEvent.containerId,
contractId: contractEvent.contractId,
conditions: JSON.stringify(info),
timestamp: moment().utc().unix()
});
//this.modules.logger.info("info", contractEvent.containerId, JSON.parse(JSON.stringify(info)))
msg.release();
break;
case 'destroy': {
if (!this.contracts[contractEvent.containerId] || !this.contracts[contractEvent.containerId][contractEvent.contractId]) {
return msg.release();
}
this.storageUpdate.push({
containerId: contractEvent.containerId,
contractId: contractEvent.contractId,
conditions: JSON.stringify(this.contracts[contractEvent.containerId][contractEvent.contractId]),
archive: 1
});
delete this.contracts[contractEvent.containerId][contractEvent.contractId];
msg.release();
break;
}
default:
if (!this.contracts[contractEvent.containerId]) {
return msg.release();
}
let eventName = 'on' + changeCase.pascalCase(contractEvent.action);
let contracts = Object.keys(this.contracts[contractEvent.containerId]);
if (eventName == "onSession"){
this.modules.logger.info(eventName, contracts)
}
for (let i = 0; i < contracts.length; i++) {
let contractId = contracts[i];
let app = this.contracts[contractEvent.containerId][contractId];
if (app[eventName]) {
app[eventName](contractEvent.argv);
if (app.execute()) {
this.queue.push({
containerId: contractEvent.containerId,
contractId: contractId,
conditions: JSON.stringify(app),
timestamp: moment().utc().unix()
});
this.modules.logger.info("event verify ok", eventName, contractEvent.containerId, JSON.parse(JSON.stringify(app)))
} else {
this.modules.logger.info("event verify fail", eventName, contractEvent.containerId, JSON.parse(JSON.stringify(app)))
}
this.storageUpdate.push({
containerId: contractEvent.containerId,
contractId: contractId,
conditions: JSON.stringify(app)
});
}
}
msg.release();
break;
}
}, 4)
}
__load(cb) {
this.modules.sql.query(`
SELECT [Uuid] contractId, [Status] conditions, [ContainerId] containerId
FROM [dbo].[Contracts]
where [Archive] = 0
`, null, (err, rows) => {
if (err || !rows.dataset[0].length) {
return cb(err)
}
rows.dataset[0].forEach((row) => {
let conditions
try {
conditions = JSON.parse(row.conditions)
} catch (e) {
return this.modules.logger.info(row.conditions, e.toString())
}
let app = this.__createContract(conditions.script, conditions.argv);
delete conditions.script;
delete conditions.argv;
let fields = Object.keys(conditions);
for (let i = 0; i < fields.length; i++) {
app[fields[i]] = conditions[fields[i]];
}
this.contracts[row.containerId] = this.contracts[row.containerId] || {};
this.contracts[row.containerId][row.contractId] = app;
});
cb();
})
}
onBind(scope) {
this.modules = scope;
this.ready = true;
this.__load((err) => {
if (err) {
this.ready = false;
return this.modules.logger.error("loading", err)
}
this.__listen();
});
}
cleanup(cb) {
this.ready = false;
cb();
}
}
module.exports = Contract;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment