Created
June 7, 2017 17:56
-
-
Save freeart/d26bd175c666de26d51d05e1fb8ec6df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const async = require('async'), | |
vm = require("vm"), | |
fs = require("fs"), | |
moment = require('moment'), | |
changeCase = require('change-case'); | |
class Contract { | |
constructor(cb, scope) { | |
this.modules; | |
this.ready = false; | |
this.core = scope; | |
this.contracts = {}; | |
this.cache = {}; | |
this.core.on("bind", (msg) => { | |
this.onBind(msg); | |
}); | |
this.queue = async.queue((data, cb) => { | |
async.retry({times: 3, interval: 5000}, (cb) => { | |
this.modules.bus.send([ | |
data.containerId, | |
data.contractId, | |
data.conditions, | |
data.timestamp | |
], "mapit2.conditions", cb) | |
}, (err) => { | |
if (err) { | |
this.modules.logger.error(data, err) | |
} | |
cb(err); | |
}) | |
}, 10); | |
/*async.forever((next) => { | |
setTimeout(() => { | |
this.modules.logger.info("mapit2.conditions", this.queue.length()) | |
this.modules.logger.info("storageUpdate", this.storageUpdate.length()) | |
this.modules.logger.info("storageInsert", this.storageInsert.length()) | |
next(); | |
}, 5000); | |
})*/ | |
this.storageUpdate = async.queue((data, cb) => { | |
let record = { | |
Uuid: new String(data.contractId), | |
ContainerId: data.containerId, | |
Status: new String(data.conditions) | |
}; | |
if (data.tenant) { | |
record.Tenant = data.tenant | |
} | |
if (data.archive) { | |
record.Archive = data.archive | |
} | |
this.modules.sql.merge("Contracts", "Uuid", record, cb) | |
}, 10); | |
this.storageInsert = async.queue((data, cb) => { | |
let record = { | |
Uuid: new String(data.contractId), | |
ContainerId: data.containerId, | |
Status: new String(data.conditions) | |
}; | |
if (data.tenant) { | |
record.Tenant = data.tenant | |
} | |
if (data.archive) { | |
record.Archive = data.archive | |
} | |
this.modules.sql.insert("Contracts", "Uuid", record, cb) | |
}, 10); | |
cb(null, this); | |
} | |
parse(payload) { | |
//["location", data.device.containerId, data.device.id, [data.event.point.timestamp, data.event.point.lat, data.event.point.long, data.event.point.prec, data.event.point.sensor, data.event.point.speed]] | |
//["session", payload.containerId, payload.deviceId, [payload.begin, payload.end, payload.distance]] | |
if (!Array.isArray(payload) || typeof payload[0] !== "string") { | |
return null; | |
} | |
switch (payload[0]) { | |
case "create": | |
if (typeof payload[1] === "string" && | |
typeof payload[2] === "string" && | |
typeof payload[3] === "number" && | |
typeof payload[4] === "string" | |
) { | |
return { | |
action: payload[0], | |
containerId: payload[1], | |
contractId: payload[2], | |
tenant: payload[3], | |
script: payload[4], | |
argv: payload[5] | |
} | |
} else { | |
return null; | |
} | |
break; | |
case "destroy": | |
if (typeof payload[1] === "string" && | |
typeof payload[2] === "string" | |
) { | |
return { | |
action: payload[0], | |
containerId: payload[1], | |
contractId: payload[2] | |
} | |
} else { | |
return null; | |
} | |
break; | |
case "debug": | |
if (typeof payload[1] === "string" && | |
typeof payload[2] === "string" | |
) { | |
return { | |
action: payload[0], | |
containerId: payload[1], | |
contractId: payload[2] | |
} | |
} else { | |
return null; | |
} | |
break; | |
default: | |
if (typeof payload[1] === "string" && payload.length == 3) { | |
return { | |
action: payload[0], | |
containerId: payload[1], | |
argv: payload[2] | |
} | |
} else { | |
return null; | |
} | |
break; | |
} | |
} | |
__createContract(script, argv) { | |
const cache = this.cache[script] = this.cache[script] || {}; | |
cache.code = cache.code || (new vm.Script(fs.readFileSync(`./contracts/${script}.js`))); | |
cache.context = cache.context || (new vm.createContext({geolib: require('geolib')})); | |
cache.App = cache.App || cache.code.runInContext(cache.context); | |
let app = new cache.App(); | |
Object.defineProperty(app, "script", { | |
value: script, | |
writable: false, | |
enumerable: true | |
}); | |
Object.defineProperty(app, "argv", { | |
value: argv || {}, | |
writable: false, | |
enumerable: true | |
}); | |
return app; | |
} | |
__listen() { | |
this.modules.bus.listen("mapit2.contracts", (err, msg) => { | |
if (err) { | |
if (err.toString() == "Error: Socket closed abruptly during opening handshake") { | |
this.modules.logger.error("__listen restart", err) | |
setTimeout(() => { | |
this.modules.logger.error("__listen restarted") | |
this.__listen(); | |
}, 5000) | |
} | |
return this.modules.logger.error("__listen", err) | |
} | |
let payload; | |
let contractEvent; | |
try { | |
payload = JSON.parse(msg.payload); | |
contractEvent = this.parse(payload); | |
} | |
catch (e) { | |
msg.release(); | |
return this.modules.logger.error(msg.payload, e.toString()) | |
} | |
if (!contractEvent) { | |
this.modules.logger.info(msg.payload, "wrong signature") | |
return msg.release(); | |
} | |
switch (contractEvent.action) { | |
case 'create': | |
this.contracts[contractEvent.containerId] = this.contracts[contractEvent.containerId] || {}; | |
if (Object.keys(this.contracts[contractEvent.containerId]).length > 10) { | |
return msg.release(); | |
} | |
let app = this.__createContract(contractEvent.script, contractEvent.argv); | |
app.main && app.main(); | |
if (app.execute()) { | |
this.queue.push({ | |
containerId: contractEvent.containerId, | |
contractId: contractEvent.contractId, | |
conditions: JSON.stringify(app), | |
timestamp: moment().utc().unix() | |
}) | |
this.contracts[contractEvent.containerId][contractEvent.contractId] = app; | |
this.modules.logger.info("first verify ok", contractEvent.containerId, app); | |
} else { | |
this.contracts[contractEvent.containerId][contractEvent.contractId] = app; | |
this.modules.logger.info("first verify fail", contractEvent.containerId, app); | |
} | |
this.storageInsert.push({ | |
tenant: contractEvent.tenant, | |
containerId: contractEvent.containerId, | |
contractId: contractEvent.contractId, | |
conditions: JSON.stringify(app) | |
}); | |
msg.release(); | |
break; | |
case 'debug': | |
if (!this.contracts[contractEvent.containerId]) { | |
return msg.release(); | |
} | |
let info = this.contracts[contractEvent.containerId][contractEvent.contractId]; | |
this.queue.push({ | |
containerId: contractEvent.containerId, | |
contractId: contractEvent.contractId, | |
conditions: JSON.stringify(info), | |
timestamp: moment().utc().unix() | |
}); | |
//this.modules.logger.info("info", contractEvent.containerId, JSON.parse(JSON.stringify(info))) | |
msg.release(); | |
break; | |
case 'destroy': { | |
if (!this.contracts[contractEvent.containerId] || !this.contracts[contractEvent.containerId][contractEvent.contractId]) { | |
return msg.release(); | |
} | |
this.storageUpdate.push({ | |
containerId: contractEvent.containerId, | |
contractId: contractEvent.contractId, | |
conditions: JSON.stringify(this.contracts[contractEvent.containerId][contractEvent.contractId]), | |
archive: 1 | |
}); | |
delete this.contracts[contractEvent.containerId][contractEvent.contractId]; | |
msg.release(); | |
break; | |
} | |
default: | |
if (!this.contracts[contractEvent.containerId]) { | |
return msg.release(); | |
} | |
let eventName = 'on' + changeCase.pascalCase(contractEvent.action); | |
let contracts = Object.keys(this.contracts[contractEvent.containerId]); | |
if (eventName == "onSession"){ | |
this.modules.logger.info(eventName, contracts) | |
} | |
for (let i = 0; i < contracts.length; i++) { | |
let contractId = contracts[i]; | |
let app = this.contracts[contractEvent.containerId][contractId]; | |
if (app[eventName]) { | |
app[eventName](contractEvent.argv); | |
if (app.execute()) { | |
this.queue.push({ | |
containerId: contractEvent.containerId, | |
contractId: contractId, | |
conditions: JSON.stringify(app), | |
timestamp: moment().utc().unix() | |
}); | |
this.modules.logger.info("event verify ok", eventName, contractEvent.containerId, JSON.parse(JSON.stringify(app))) | |
} else { | |
this.modules.logger.info("event verify fail", eventName, contractEvent.containerId, JSON.parse(JSON.stringify(app))) | |
} | |
this.storageUpdate.push({ | |
containerId: contractEvent.containerId, | |
contractId: contractId, | |
conditions: JSON.stringify(app) | |
}); | |
} | |
} | |
msg.release(); | |
break; | |
} | |
}, 4) | |
} | |
__load(cb) { | |
this.modules.sql.query(` | |
SELECT [Uuid] contractId, [Status] conditions, [ContainerId] containerId | |
FROM [dbo].[Contracts] | |
where [Archive] = 0 | |
`, null, (err, rows) => { | |
if (err || !rows.dataset[0].length) { | |
return cb(err) | |
} | |
rows.dataset[0].forEach((row) => { | |
let conditions | |
try { | |
conditions = JSON.parse(row.conditions) | |
} catch (e) { | |
return this.modules.logger.info(row.conditions, e.toString()) | |
} | |
let app = this.__createContract(conditions.script, conditions.argv); | |
delete conditions.script; | |
delete conditions.argv; | |
let fields = Object.keys(conditions); | |
for (let i = 0; i < fields.length; i++) { | |
app[fields[i]] = conditions[fields[i]]; | |
} | |
this.contracts[row.containerId] = this.contracts[row.containerId] || {}; | |
this.contracts[row.containerId][row.contractId] = app; | |
}); | |
cb(); | |
}) | |
} | |
onBind(scope) { | |
this.modules = scope; | |
this.ready = true; | |
this.__load((err) => { | |
if (err) { | |
this.ready = false; | |
return this.modules.logger.error("loading", err) | |
} | |
this.__listen(); | |
}); | |
} | |
cleanup(cb) { | |
this.ready = false; | |
cb(); | |
} | |
} | |
module.exports = Contract; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment