Skip to content

Instantly share code, notes, and snippets.

@shelakel
Last active November 12, 2015 14:08
Show Gist options
  • Save shelakel/93bd11cd3cc520122e2f to your computer and use it in GitHub Desktop.
Save shelakel/93bd11cd3cc520122e2f to your computer and use it in GitHub Desktop.
Node.Js 4+ Service + Service Host abstract implementations
"use strict";
const EventEmitter = require("events");
//-----------------------------------------------
// Service
//-----------------------------------------------
// service state
const STATE = Symbol();
const STARTING = "starting";
const STARTED = "started";
const STOPPING = "stopping";
const STOPPED = "stopped";
// process.nextTick is not used in runInternal because
// recursively setting nextTick callbacks will block any I/O from happening,
// just like a while(true); loop.
// see: https://nodejs.org/api/process.html#process_process_nexttick_callback_arg
function runInternal(err) {
if (err) {
this.emit("error", err);
}
if (this.state === STOPPING) {
this[STATE] = STOPPED;
this.emit("stopped");
return;
}
setImmediate(this.run.bind(this, runInternal.bind(this)));
}
class Service extends EventEmitter {
constructor() {
super();
this[STATE] = STOPPED;
}
get state() {
return this[STATE];
}
start(cb) {
if (this.state === STARTING ||
this.state === STARTED) {
if (cb) {
process.nextTick(cb);
}
return;
}
if (cb) {
this.once("started", cb);
}
this[STATE] = STARTING;
process.nextTick(() => {
if (this.state !== STARTING) {
return;
}
this[STATE] = STARTED;
this.emit("started");
this.run(runInternal.bind(this));
});
}
run(cb) {
throw Error("run is not implemented");
}
stop(cb) {
if (this.state !== STARTED) {
if (cb) {
process.nextTick(cb);
}
return;
}
if (cb) {
this.once("stopped", cb);
}
// signal the service to stop on next iteration
this[STATE] = STOPPING;
}
}
//-----------------------------------------------
// Service Host
//-----------------------------------------------
const SERVICES = Symbol();
function flattenServices(services) {
if (!services) {
return [];
}
if (!Array.isArray(services)) {
return [services];
}
return Array.prototype.concat.apply([], services.map(flattenServices));
}
class ServiceHost extends EventEmitter {
constructor() {
super()
this[STATE] = STOPPED;
this[SERVICES] = flattenServices(Array.prototype.slice.call(arguments));
}
get state() {
return this[STATE];
}
add() {
const servicesToAdd = flattenServices(Array.prototype.slice.call(arguments));
if (servicesToAdd.length > 0) {
this[SERVICES].push.apply(servicesToAdd);
}
return this;
}
remove() {
const servicesToRemove = flattenServices(Array.prototype.slice.call(arguments));
if (servicesToRemove.length === 0 ||
this[SERVICES].length === 0) {
return this;
}
this[SERVICES] = this[SERVICES].filter(function(service) {
return servicesToRemove.indexOf(service) >= 0;
});
return this;
}
start(cb) {
if (this.state === STARTING ||
this.state === STARTED) {
if (cb) {
process.nextTick(cb);
}
return;
}
if (cb) {
this.once("started", cb);
}
this[STATE] = STARTING;
process.nextTick(() => {
if (this.state !== STARTING) {
return;
}
let count = 0;
const onStarted = () => {
count++;
if (count === this[SERVICES].length) {
this[STATE] = STARTED;
this.emit("started");
}
}
this[SERVICES].forEach(function(service) {
service.start(onStarted);
});
});
}
stop(cb) {
if (this.state !== STARTED) {
if (cb) {
process.nextTick(cb);
}
return;
}
if (cb) {
this.once("stopped", cb);
}
// signal the services to stop on next iteration
this[STATE] = STOPPING;
process.nextTick(() => {
let count = 0;
const onStopped = () => {
count++;
if (count === this[SERVICES].length) {
this[STATE] = STOPPED;
this.emit("stopped");
}
}
this[SERVICES].forEach(function(service) {
service.stop(onStopped);
});
});
}
}
module.exports = {
Service,
ServiceHost
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment