Last active
February 17, 2018 20:55
-
-
Save jaszhix/215306816e15b3cf78af6369700ab97b to your computer and use it in GitHub Desktop.
Experimental CJS subprocess IPC handling with libsoup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This populates our imports object with gi modules in the child process. | |
imports.gi.GIRepository.Repository.get_default(); | |
const Gio = imports.gi.Gio; | |
const Soup = imports.gi.Soup; | |
const Signals = imports.signals; | |
// Cinnamon's JS context does not have an ARGV global defined. | |
const isChildProcess = typeof ARGV !== 'undefined'; | |
const DEBUG = true; | |
/** | |
* Worker | |
* | |
* Signals: | |
* | |
* "worker-ready": When both IPC end points are connected, this signal will indicate when code and methods can be sent | |
* to the child process for evaluation. | |
* | |
* "worker-killed": Emitted before the child process is terminated. | |
* | |
* Example usage: | |
* | |
* Starting an instance: | |
* | |
* let worker = new Worker(null, this.path); | |
* | |
* worker.connect('worker-ready', Lang.bind(this, ()=>{ | |
* worker.send('code', 'print(Math.round(456709 / 92))').then((res)=>{ | |
* print(res); // 4964 | |
* }).catch((e, message)=>{ | |
* print('ERR!'); | |
* print(e, message); | |
* }); | |
* })); | |
* | |
* Importing a file: | |
* | |
* let _import = { | |
* path: metadata.path, | |
* name: 'MyClass', | |
* args: [], | |
* isConstructor: true | |
* }; | |
* | |
* worker.send('import', JSON.stringify(_import)).then((res)=>{ | |
* print(res) | |
* }).catch((e, message)=>{ | |
* print('ERR!') | |
* print(e, message) | |
* }); | |
* | |
* After importing, your specified file will be available in the child's context `this`. If isConstructor is | |
* passed, it will create a new instance and pass its context to it as the first argument. If you do this, you can | |
* then create a helper class that assumes it has access to the worker with `this`. | |
* | |
* worker.send('code', 'this.MyClass.specialFunction('+JsonSerializableArgument+')').then((res)=>{ | |
* print(res); // Returned result | |
* }).catch((e, message)=>{ | |
* print('ERR!'); | |
* print(e, message); | |
* }); | |
* | |
*/ | |
function Worker() { | |
this._init.apply(this, arguments); | |
} | |
Worker.prototype = { | |
/** | |
* _init | |
* | |
* @param {array} ports - private, only used by the main thread constructor, null should be passed. | |
* @param {string} path - the path containing worker.js | |
*/ | |
_init: function(ports, path) { | |
this.subprocess = null; | |
this.path = path; | |
this.subprocessFile = this.path + '/worker.js'; | |
this.parentServerPort = isChildProcess ? ports[0] : NaN; | |
this.childServerPort = isChildProcess ? ports[1] : NaN; | |
this.freePorts = []; | |
/*this.connect('worker-ready', ()=>{ | |
this.print('test kill...'); | |
this.killWorker(); | |
})*/ | |
if (!isChildProcess) { | |
this.findUnusedPorts(()=>{ | |
this.ipcServer(()=>{ | |
this.__init(); | |
}); | |
}); | |
} else { | |
this.print('Worker init', ARGV) | |
this.__init(); | |
} | |
}, | |
__init: function(){ | |
if (!isChildProcess && this.path) { | |
this.eval(this.subprocessFile, null); | |
} else { | |
// Set up the IPC server for the child process, then | |
this.ipcServer(()=>{ | |
this.send('code', 'this.print(\'Worker connected\')').then((res)=>{ | |
}).catch((err)=>{ | |
this.print(err); | |
}); | |
}); | |
} | |
}, | |
/** | |
* This will attempt to find an unused port on the machine, and prefers the highest ports Soup will work with. | |
* | |
* @param {function} cb | |
*/ | |
findUnusedPorts: function(cb){ | |
// Only run in the parent (main thread) | |
this.eval(null, ['netstat', '-lntu'], (usedPorts)=>{ | |
usedPorts = usedPorts.split('\n'); | |
let intUsedPorts = []; | |
for (let i = 0, len = usedPorts.length; i < len; i++) { | |
if (!usedPorts[i]) { | |
continue; | |
} | |
let port = usedPorts[i].trim().match(/([a-zA-Z0-9\-_\.]+):([0-9]{1,5})/gm); | |
if (port) { | |
let portParts = port[port.length - 1].split(':')[1]; | |
if (typeof portParts === 'string') { | |
port = parseInt(portParts); | |
intUsedPorts.push(port); | |
} | |
} | |
} | |
// 49151 is the highest port Soup will use. | |
let port = 49151; | |
while (port > 1024 && this.freePorts.length < 6) { | |
if (intUsedPorts.indexOf(port) === -1) { | |
this.freePorts.push(port) | |
} | |
--port; | |
} | |
this.parentServerPort = this.freePorts[0]; | |
this.childServerPort = this.freePorts[this.freePorts.length - 1]; | |
this.print('parentServerPort: ', this.parentServerPort) | |
this.print('childServerPort: ', this.childServerPort) | |
cb(); | |
}); | |
}, | |
/** | |
* Starts a new cjs child process, or executes a command. | |
* | |
* @param {string} file | |
* @param {array} proc | |
* @param {function} cb | |
*/ | |
eval: function(file, proc, cb){ | |
let exec; | |
if (proc) { | |
exec = proc; | |
} else { | |
exec = ['cjs', file, '--parentServerPort='+this.parentServerPort, '--childServerPort='+this.childServerPort]; | |
} | |
let subprocess = new Gio.Subprocess({ | |
argv: exec, | |
// We need STDIN so our child process has an ARGV context. | |
flags: Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDIN_PIPE, | |
}); | |
subprocess.init(null); | |
subprocess.communicate_utf8_async(null, null, (obj, res)=>{ | |
let [success, out] = obj.communicate_utf8_finish(res); | |
this.print('out: ', out !== null) | |
if (typeof cb === 'function') { | |
cb(!success || out); | |
} | |
}); | |
}, | |
/** | |
* Starts the Soup server for IPC communication. | |
* | |
* @param {function} cb | |
*/ | |
ipcServer: function(cb){ | |
let port = isChildProcess ? this.childServerPort : this.parentServerPort; | |
let handler = (server, msg, path, query, client)=>{ | |
msg.status_code = 200; | |
msg.response_headers.set_content_type('application/json', {}); | |
// Need to use GET requests because Soup would not populate the request_body for unknown reasons. | |
let request = JSON.parse(msg.request_body.data); | |
this.print('ipcServer request: ', request) | |
if (!request) { | |
return false; | |
} else if (request.code) { | |
let res = {code: eval(request.code)} | |
msg.set_response('application/json', Soup.MemoryUse.COPY, JSON.stringify(res)); | |
} else if (request.import) { | |
try { | |
request.import = JSON.parse(request.import); | |
imports.searchPath.unshift(request.import.path); | |
let _import = imports[request.import.name]; | |
if (request.import.isConstructor) { | |
let args = [this].concat(request.import.args); | |
this[request.import.name] = new _import(...args); | |
} else { | |
this[request.import.name] = _import; | |
} | |
msg.set_response('application/json', Soup.MemoryUse.COPY, '{success: true}'); | |
} catch (e) { | |
this.print('Error importing file...', e); | |
msg.response_body.append('{}'); | |
} | |
} else if (request.method) { | |
request.method = JSON.parse(request.method); | |
request.method.name = request.method.name.split('.'); | |
this.print(request.method) | |
let res = this[request.method.name[0]][request.method.name[1]](...request.method.args); | |
msg.set_response('application/json', Soup.MemoryUse.COPY, JSON.stringify(res)); | |
} | |
}; | |
this.server = new Soup.Server(); | |
try { | |
let success = this.server.listen_local(port, Soup.ServerListenOptions.IPV6_ONLY); | |
} catch (e) { | |
this.print('server.listen_local failed: ', e); | |
} | |
try { | |
let uris = this.server.get_uris(); | |
if (typeof uris[0] === 'undefined') { | |
if (isChildProcess) { | |
--this.childServerPort; | |
} else { | |
--this.parentServerPort; | |
} | |
this.ipcServer(cb); | |
return; | |
} | |
// Assign the port to our context just incase Soup decides to listen on a random port for no reason... | |
let port = uris[0].get_port(); | |
if (isChildProcess) { | |
this.childServerPort = port; | |
} else { | |
this.parentServerPort = port; | |
} | |
} catch (e) { | |
print(e) | |
} | |
// Send code to be evaluated back the main thread triggering a ready signal for the callee. | |
if (isChildProcess) { | |
this.send('code', 'this.emit(\'worker-ready\');'); | |
} | |
cb(); | |
this.server.add_handler('/', handler); | |
this.server.run(); | |
}, | |
/** | |
* Send code to the other Worker instance to evaluate, or an import. | |
* | |
* @param {string} type - options are "code" and "import". | |
* @param {string} content - for code, this should be JS code in a string, or an import. | |
*/ | |
send: function(type, content){ | |
let port = isChildProcess ? this.parentServerPort : this.childServerPort; | |
let url = 'http://ip6-localhost:'+port.toString()+'/' | |
this.print('send request port: ', port) | |
this.print('send request URL: ', url) | |
this.print('send request type: ', type) | |
this.print('send request content: ', content) | |
if (typeof content === 'object') { | |
content = JSON.stringify(content); | |
} | |
return new Promise((resolve, reject)=>{ | |
let httpSession = new Soup.SessionAsync(); | |
httpSession.user_agent = 'IPC/API'; | |
let request = Soup.Message.new('POST', url); | |
request.request_headers.set_content_type('application/json', {}); | |
let reqObject = {}; | |
reqObject[type] = content; | |
request.set_request('application/json', Soup.MemoryUse.COPY, JSON.stringify(reqObject)); | |
Soup.Session.prototype.add_feature.call(httpSession, new Soup.ProxyResolverDefault()); | |
Soup.Session.prototype.add_feature.call(httpSession, new Soup.ContentDecoder()); | |
httpSession.queue_message(request, (session, message)=>{ | |
this.print('status: ', message.status_code); | |
try { | |
let output = JSON.parse(message.response_body.data); | |
output = output.code ? output.code : output; | |
resolve(output); | |
} catch (e) { | |
resolve(message.response_body.data); | |
} | |
}); | |
}); | |
}, | |
/** | |
* Wrapper around print that allows identifying which worker the logging is coming from. | |
* | |
*/ | |
print: function(){ | |
if (!DEBUG) { | |
return false; | |
} | |
let msgAppend = isChildProcess ? 'Child: ' : 'Parent: '; | |
let args = [msgAppend]; | |
for (let i = 0, len = arguments.length; i < len; i++) { | |
args.push(JSON.stringify(arguments[i])); | |
} | |
print(...args); | |
}, | |
/** | |
* Finds the PID this worker's child CJS process is running on by comparing its port in a netstat query, and | |
* kills it. We need to query the PID this way since Gio.Subprocess doesn't return it. | |
* | |
* @param {function} cb | |
*/ | |
killWorker: function(cb){ | |
if (isChildProcess) { | |
return false; | |
} | |
this.emit('worker-killed'); | |
this.eval(null, ['netstat', '-ap'], (cjsPorts)=>{ | |
cjsPorts = cjsPorts.split('\n'); | |
for (let i = 0, len = cjsPorts.length; i < len; i++) { | |
if (!cjsPorts[i]) { | |
continue; | |
} | |
let port = cjsPorts[i].trim().match(/([a-zA-Z0-9\-_\.]+):([0-9]{1,5})/gm); | |
if (port) { | |
let portParts = port[port.length - 1].split(':')[1]; | |
if (typeof portParts === 'string') { | |
port = parseInt(portParts); | |
if ((port === this.parentServerPort || port === this.childServerPort) | |
&& cjsPorts[i].indexOf('/cjs') !== -1) { | |
let pid = cjsPorts[i].split('/cjs')[0].split('LISTEN ')[1]; | |
if (!isNaN(parseInt(pid))) { | |
this.eval(null, ['kill', '-KILL', pid]); | |
this.print('killed', pid) | |
} | |
} | |
} | |
} | |
} | |
if (typeof cb === 'function') { | |
cb(); | |
} | |
}); | |
}, | |
destroy: function(){ | |
this.killWorker(()=>{ | |
this.server.disconnect(); | |
}); | |
} | |
}; | |
Signals.addSignalMethods(Worker.prototype); | |
// This handles the invocation of our child process's Worker instance. | |
if (isChildProcess) { | |
print('Starting worker...') | |
print('ARGV: ', JSON.stringify(ARGV)) | |
let parentServerPort = parseInt(ARGV[0].split('--parentServerPort=')[1]); | |
let childServerPort = parseInt(ARGV[1].split('--childServerPort=')[1]); | |
print([parentServerPort, childServerPort]) | |
var worker = new Worker([parentServerPort, childServerPort]); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment