Last active
November 12, 2015 14:08
-
-
Save shelakel/93bd11cd3cc520122e2f to your computer and use it in GitHub Desktop.
Node.Js 4+ Service + Service Host abstract implementations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const EventEmitter = require("events"); | |
//----------------------------------------------- | |
// Service | |
//----------------------------------------------- | |
// service state | |
const STATE = Symbol(); | |
const STARTING = "starting"; | |
const STARTED = "started"; | |
const STOPPING = "stopping"; | |
const STOPPED = "stopped"; | |
// process.nextTick is not used in runInternal because | |
// recursively setting nextTick callbacks will block any I/O from happening, | |
// just like a while(true); loop. | |
// see: https://nodejs.org/api/process.html#process_process_nexttick_callback_arg | |
function runInternal(err) { | |
if (err) { | |
this.emit("error", err); | |
} | |
if (this.state === STOPPING) { | |
this[STATE] = STOPPED; | |
this.emit("stopped"); | |
return; | |
} | |
setImmediate(this.run.bind(this, runInternal.bind(this))); | |
} | |
class Service extends EventEmitter { | |
constructor() { | |
super(); | |
this[STATE] = STOPPED; | |
} | |
get state() { | |
return this[STATE]; | |
} | |
start(cb) { | |
if (this.state === STARTING || | |
this.state === STARTED) { | |
if (cb) { | |
process.nextTick(cb); | |
} | |
return; | |
} | |
if (cb) { | |
this.once("started", cb); | |
} | |
this[STATE] = STARTING; | |
process.nextTick(() => { | |
if (this.state !== STARTING) { | |
return; | |
} | |
this[STATE] = STARTED; | |
this.emit("started"); | |
this.run(runInternal.bind(this)); | |
}); | |
} | |
run(cb) { | |
throw Error("run is not implemented"); | |
} | |
stop(cb) { | |
if (this.state !== STARTED) { | |
if (cb) { | |
process.nextTick(cb); | |
} | |
return; | |
} | |
if (cb) { | |
this.once("stopped", cb); | |
} | |
// signal the service to stop on next iteration | |
this[STATE] = STOPPING; | |
} | |
} | |
//----------------------------------------------- | |
// Service Host | |
//----------------------------------------------- | |
const SERVICES = Symbol(); | |
function flattenServices(services) { | |
if (!services) { | |
return []; | |
} | |
if (!Array.isArray(services)) { | |
return [services]; | |
} | |
return Array.prototype.concat.apply([], services.map(flattenServices)); | |
} | |
class ServiceHost extends EventEmitter { | |
constructor() { | |
super() | |
this[STATE] = STOPPED; | |
this[SERVICES] = flattenServices(Array.prototype.slice.call(arguments)); | |
} | |
get state() { | |
return this[STATE]; | |
} | |
add() { | |
const servicesToAdd = flattenServices(Array.prototype.slice.call(arguments)); | |
if (servicesToAdd.length > 0) { | |
this[SERVICES].push.apply(servicesToAdd); | |
} | |
return this; | |
} | |
remove() { | |
const servicesToRemove = flattenServices(Array.prototype.slice.call(arguments)); | |
if (servicesToRemove.length === 0 || | |
this[SERVICES].length === 0) { | |
return this; | |
} | |
this[SERVICES] = this[SERVICES].filter(function(service) { | |
return servicesToRemove.indexOf(service) >= 0; | |
}); | |
return this; | |
} | |
start(cb) { | |
if (this.state === STARTING || | |
this.state === STARTED) { | |
if (cb) { | |
process.nextTick(cb); | |
} | |
return; | |
} | |
if (cb) { | |
this.once("started", cb); | |
} | |
this[STATE] = STARTING; | |
process.nextTick(() => { | |
if (this.state !== STARTING) { | |
return; | |
} | |
let count = 0; | |
const onStarted = () => { | |
count++; | |
if (count === this[SERVICES].length) { | |
this[STATE] = STARTED; | |
this.emit("started"); | |
} | |
} | |
this[SERVICES].forEach(function(service) { | |
service.start(onStarted); | |
}); | |
}); | |
} | |
stop(cb) { | |
if (this.state !== STARTED) { | |
if (cb) { | |
process.nextTick(cb); | |
} | |
return; | |
} | |
if (cb) { | |
this.once("stopped", cb); | |
} | |
// signal the services to stop on next iteration | |
this[STATE] = STOPPING; | |
process.nextTick(() => { | |
let count = 0; | |
const onStopped = () => { | |
count++; | |
if (count === this[SERVICES].length) { | |
this[STATE] = STOPPED; | |
this.emit("stopped"); | |
} | |
} | |
this[SERVICES].forEach(function(service) { | |
service.stop(onStopped); | |
}); | |
}); | |
} | |
} | |
module.exports = { | |
Service, | |
ServiceHost | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment