Last active
December 1, 2021 16:53
-
-
Save intech/e5ea80f2750809de12c53032188be2d4 to your computer and use it in GitHub Desktop.
Moleculer Socket.IO mixin with auto-alias
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = { | |
name: "test", | |
version: 1, | |
actions: { | |
test: { | |
ws: { | |
name: "test" | |
}, | |
async handler(ctx) { | |
const { user } = ctx.meta; | |
// on client emit event msg with payload { msg } | |
ctx.broadcast("io.msg", { msg: "event from server" }, { | |
meta: { rooms: [user] } | |
}.catch(err => this.logger.error(err)); | |
return user; | |
} | |
} | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const _ = require("lodash"); | |
const { Server } = require("socket.io"); | |
const { ServiceNotFoundError, MoleculerClientError } = require("moleculer").Errors; | |
const { UnAuthorizedError, ERR_INVALID_TOKEN, ERR_NO_TOKEN } = require("moleculer-web").Errors; | |
module.exports = { | |
name: "ws", | |
settings: { | |
$shutdownTimeout: 5000, | |
debounceTime: 500, | |
io: { | |
//socket.io options | |
options: { | |
path: "/ws", | |
pingInterval: 1000, | |
pingTimeout: 5000, | |
connectTimeout: 5000, | |
serveClient: false, | |
cookie: false | |
// transports: ["websocket"] | |
} | |
} | |
}, | |
events: { | |
async "$services.changed"() { | |
await this.regenerateEventsAuto(); | |
}, | |
"io.latency": { | |
ws: { | |
name: "latency" | |
}, | |
group: "io", | |
async handler() { | |
return Promise.resolve(); | |
} | |
}, | |
"io.join": { | |
ws: { | |
name: "join" | |
}, | |
group: "io", | |
async handler(ctx) { | |
const { params, meta } = ctx; | |
this.logger.debug("join:", { meta }, params); | |
this.io.to(meta.socket).socketsJoin(params); | |
} | |
}, | |
"io.leave": { | |
ws: { | |
name: "leave" | |
}, | |
group: "io", | |
async handler(ctx) { | |
const { params, meta } = ctx; | |
this.logger.debug("leave:", { meta }, params); | |
this.io.to(meta.socket).socketsLeave(params); | |
} | |
}, | |
"io.**": { | |
group: "io", | |
async handler(ctx) { | |
const { eventName, params, meta } = ctx; | |
if (meta.from === "client") return; | |
const event = eventName.substr(3); | |
this.logger.debug("[IO] emit: ", eventName, event, params, meta); | |
let namespace = this.io; | |
if (meta.namespace) namespace = namespace.of(meta.namespace); | |
// that the event data may be lost if the client is not ready to receive messages | |
if (meta.volate) namespace = namespace.volate; | |
// the event data will only be broadcast to every sockets but the sender | |
if (meta.broadcast) namespace = namespace.broadcast; | |
if (meta.rooms) { | |
for (let room of meta.rooms) { | |
const groups = room.split(":"); | |
if (groups.length) { | |
for (let i = 1; i <= groups.length; i++) { | |
const wildcard = groups.slice(0, i).join(":"); | |
this.logger.debug(`emit wildcard ${i} groups:`, wildcard); | |
namespace = namespace.to(wildcard); | |
} | |
} | |
} | |
} | |
if (params) { | |
namespace.emit(event, params); | |
} else { | |
namespace.emit(event); | |
} | |
} | |
} | |
}, | |
methods: { | |
/** | |
* Init Socket.IO | |
* @param opts | |
*/ | |
initSocketIO(opts = {}) { | |
opts = { | |
...(this.settings.io && this.settings.io.options ? this.settings.io.options : {}), | |
cors: this.settings.cors || "*" | |
}; | |
this.io = new Server(this.server, opts); | |
}, | |
/** | |
* Get meta data from socket | |
* @param socket | |
* @return {*&{$rooms: string[]}} | |
*/ | |
socketGetMeta(socket) { | |
let data = { | |
...socket.data, | |
$rooms: Object.keys(socket.rooms) | |
}; | |
this.logger.debug("getMeta", data); | |
return data; | |
}, | |
/** | |
* Set meta data in socket | |
* @param socket | |
* @param data | |
* @return {*|(*)} | |
*/ | |
socketSaveMeta(socket, data) { | |
socket.data = { | |
...socket.data, | |
...data | |
}; | |
this.logger.debug("setMeta", socket.data); | |
return socket.data; | |
}, | |
/** | |
* Formatter error | |
* @param err | |
* @param respond | |
* @return {*} | |
*/ | |
socketOnError(err, respond) { | |
const errObj = _.pick(err, ["name", "message", "code", "type", "data"]); | |
return respond(errObj); | |
}, | |
/** | |
* Get handler action by event name | |
* @param namespace | |
* @param eventName | |
* @param args | |
* @return {Promise<unknown>} | |
*/ | |
async getEventHandler(namespace, eventName, ...args) { | |
this.logger.debug("getEventHandler:", namespace, eventName, args); | |
// Check endpoint visibility | |
const handler = this.handlers.get(`${namespace}${eventName}`); | |
if (!handler) { | |
// Action can't be published | |
throw new ServiceNotFoundError({ eventName }); | |
} | |
this.logger.debug("getEventHandler action:", handler); | |
return handler; | |
}, | |
/** | |
* Registration all events with ws in registry | |
*/ | |
async registrationEvents() { | |
const events = this.broker.registry.events.list({ | |
onlyLocal: false, | |
onlyAvailable: true, | |
skipInternal: true, | |
withEndpoints: false | |
}); | |
for (const { name, group, event } of events) { | |
const fullName = [group, name].join("."); | |
if ("ws" in event) { | |
const { namespace = "/", name = fullName } = event.ws; | |
this.logger.debug("Add route from event:", namespace, name, event.ws); | |
// register if not exists namespace | |
if (!this.namespaces.has(namespace)) this.namespaces.add(namespace); | |
// register if not exists action in namespace | |
if (!this.handlers.has(`${namespace}${name}`)) { | |
this.handlers.set(`${namespace}${name}`, { | |
type: "event", | |
name: event.name | |
}); | |
} | |
} | |
} | |
}, | |
/** | |
* Registration all actions with ws in registry | |
*/ | |
async registrationActions() { | |
const actions = this.broker.registry.actions.list({ | |
onlyLocal: false, | |
onlyAvailable: true, | |
skipInternal: true, | |
withEndpoints: false | |
}); | |
for (const { name: fullName, action } of actions) { | |
if ("ws" in action) { | |
const { namespace = "/", name = fullName } = action.ws; | |
this.logger.debug("Add route:", fullName, namespace, name, action.ws); | |
// register if not exists namespace | |
if (!this.namespaces.has(namespace)) this.namespaces.add(namespace); | |
// register if not exists action in namespace | |
if (!this.handlers.has(`${namespace}${name}`)) { | |
this.handlers.set(`${namespace}${name}`, { | |
type: "action", | |
name: fullName | |
}); | |
} | |
} | |
} | |
} | |
}, | |
created() { | |
this.namespaces = new Set(); | |
this.handlers = new Map(); | |
}, | |
async started() { | |
if (!this.io) this.initSocketIO(); | |
this.io.engine.on("connection_error", err => { | |
console.log(err); | |
}); | |
const debounceTime = Number(this.settings.debounceTime) || 500; | |
this.regenerateEventsAuto = _.debounce( | |
() => Promise.all([this.registrationEvents(), this.registrationActions()]), | |
debounceTime | |
); | |
this.io.use(async (socket, next) => { | |
if ("auth" in socket.handshake && "token" in socket.handshake.auth) { | |
try { | |
const { auth, headers, url } = socket.handshake; | |
this.logger.debug(`Socket ${socket.id} auth check:`, { | |
token: auth.token, | |
path: `${headers["host"]}${url}` | |
}); | |
const uuid = await this.broker.call("v1.auth.check", { | |
token: auth.token, | |
path: `${headers["host"]}${url}` | |
}); | |
this.logger.debug({ uuid }); | |
socket.join(uuid); | |
this.socketSaveMeta(socket, { socket: socket.id, user: uuid }); | |
return next(); | |
} catch (e) { | |
this.logger.error(new UnAuthorizedError(ERR_INVALID_TOKEN, e)); | |
return next(new UnAuthorizedError(ERR_INVALID_TOKEN, e)); | |
} | |
} else { | |
// return next(); | |
this.logger.error(new UnAuthorizedError(ERR_NO_TOKEN)); | |
return next(new UnAuthorizedError(ERR_NO_TOKEN)); | |
} | |
}); | |
this.io.on("connection", socket => { | |
this.logger.debug("Client connected:", socket.id, socket.nsp.name); | |
socket.on("error", err => { | |
this.logger.error("[IO]:", err.toString()); | |
if (err && err.code === 401) socket.disconnect(); | |
}); | |
socket.onAny(async (eventName, ...args) => { | |
const opts = { | |
meta: { | |
// socket: socket.id, | |
...this.socketGetMeta(socket), | |
from: "client" | |
} | |
}; | |
let cb, | |
params = {}; | |
if (Array.isArray(args)) { | |
// TODO: add format validation: | |
// io.emit("event.name", { params }, callback(response) => {}) | |
// io.emit("event.name", callback(response) => {}) | |
// io.emit("event.name", { params }) | |
// io.emit("event.name") | |
this.logger.debug("args is array", args.length); | |
if (typeof args[args.length - 1] === "function") { | |
cb = args.pop(); | |
this.logger.debug("args with callback", cb); | |
} | |
params = args.shift() || {}; | |
this.logger.debug("with params", params); | |
} | |
try { | |
this.logger.debug(socket.nsp.name, eventName, ...args); | |
const { type, name } = await this.getEventHandler( | |
socket.nsp.name, | |
eventName, | |
...args | |
); | |
this.logger.debug({ type, name }); | |
switch (type) { | |
case "action": { | |
// io.emit("event.name", { params }, callback(response) => {}) | |
// io.emit("event.name", callback(response) => {}) | |
const endpoint = this.broker.findNextActionEndpoint(name, opts); | |
if (endpoint instanceof Error) { | |
if (cb) cb(endpoint); | |
this.logger.error(endpoint); | |
} | |
try { | |
this.logger.debug("Call action:", name, params, opts); | |
const result = await this.broker.call(name, params, opts); | |
if (cb) cb(result); | |
else | |
this.logger.warn( | |
`Event received '${eventName}' without callback function` | |
); | |
} catch (e) { | |
if (cb) cb(e); | |
this.logger.error(e); | |
} | |
break; | |
} | |
case "event": { | |
// io.emit("event.name", { params }) | |
// io.emit("event.name") | |
try { | |
this.logger.debug("Call event:", name, params, opts); | |
await this.broker.emit(name, params, opts); | |
} catch (e) { | |
if (cb) cb(e); | |
this.logger.error(e); | |
} | |
break; | |
} | |
default: { | |
const err = new MoleculerClientError( | |
"Unknown type event", | |
401, | |
"ERR_WS_TYPE_EVENT" | |
); | |
if (cb) cb(err); | |
this.logger.error(err); | |
} | |
} | |
} catch (e) { | |
this.logger.error(e); | |
if (cb) cb(e); | |
} | |
}); | |
}); | |
this.logger.info("Socket.io API Gateway started."); | |
}, | |
async stopped() { | |
this.logger.info("Socket.io API Gateway stopped."); | |
if (this.io) { | |
return this.io.disconnectSockets(true); | |
} | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment