Skip to content

Instantly share code, notes, and snippets.

@intech
Created March 29, 2021 16:42
Show Gist options
  • Save intech/b43badd23e6959d05aa48186c77054c7 to your computer and use it in GitHub Desktop.
Save intech/b43badd23e6959d05aa48186c77054c7 to your computer and use it in GitHub Desktop.
Mixin for dynamic generates services and method gRPC from registry moleculer
"use strict";
const { MoleculerError } = require("moleculer").Errors;
const { Root, Namespace, Service, Method, Type, Field } = require("protobufjs");
const gRPC = require("grpc");
module.exports = {
name: "grpc",
settings: {
port: 50051,
namespace: "moleculer"
},
actions: {
sayHello: {
grpc: true,
response: {
message: "string"
},
params: {
name: "string"
},
async handler(ctx) {
this.logger.info({ params: ctx.params, meta: ctx.meta });
return { message: `Hello ${ctx.params.name}` };
}
},
currentUser: {
// grpc: true,
async handler(ctx) {
const accessToken = ctx.params["access-token"];
if (accessToken) {
if (accessToken === "12345") {
// valid credentials
return { id: 1, username: "john.doe", name: "John Doe" };
} else {
// invalid credentials
throw new MoleculerError("Unauthorized user", 401, null, { accessToken });
}
} else {
// anonymous user
return null;
}
}
}
},
events: {
"$services.changed"(ctx) {
// this.regenerateAutoAliases();
}
},
methods: {
/**
* Regenerate aliases automatically if service registry has been changed.
*/
regenerateAutoAliases() {
this.logger.info("♻ Generate protobuf...");
const root = new Root();
const namespace = new Namespace(this.settings.namespace);
root.add(namespace);
const services = this.broker.registry.getServiceList({
withActions: true,
grouping: true,
skipInternal: true,
onlyAvailable: true
});
// this.logger.info(services);
for (const service of services) {
this.logger.info(service.fullName);
const template = new Service(service.fullName);
const actions = {};
for (const { rawName, name, grpc, params, response } of Object.values(
service.actions
)) {
if (!grpc) continue;
this.logger.info({ rawName, name }, grpc, params, response);
const fullName = rawName[0].toUpperCase() + rawName.substring(1);
actions[rawName] = async (call, callback) => {
try {
const { request: params, metadata: meta } = call;
const response = await this.broker.call(name, params, { meta });
callback(null, response || {});
} catch (err) {
this.logger.error(err);
callback(this.createError(err.code, err.message, err.data), null);
}
};
// request
const req = new Type(`request${fullName}`);
let fieldId = 1;
for (const [field, options] of Object.entries(params)) {
if (typeof options === "string") {
req.add(new Field(field, fieldId++, options, "required"));
} else if (typeof options === "object") {
req.add(
new Field(
field,
fieldId++,
options.type,
"optional" in options ? "optional" : "required"
)
);
} else {
throw new Error("Field type error");
}
}
// response
const res = new Type(`response${fullName}`);
fieldId = 1;
for (const [field, options] of Object.entries(response)) {
if (typeof options === "string") {
res.add(new Field(field, fieldId++, options, "required"));
} else if (typeof options === "object") {
res.add(
new Field(
field,
fieldId++,
options.type,
"optional" in options ? "optional" : "required"
)
);
} else {
throw new Error("Field type error");
}
}
namespace.add(req).add(res);
const method = new Method(
rawName,
"rpc",
`request${fullName}`,
`response${fullName}`,
false,
false
);
template.add(method);
}
namespace.add(template);
try {
this.server.addService(
gRPC.loadObject(root)[this.settings.namespace][service.fullName].service,
actions
);
} catch (err) {
this.logger.error(err);
throw new Error(err);
}
}
},
/**
* Support create error response
* @param {number} errorCode
* @param {string} message
* @param {string | object} responseData
* @returns {{code: *, message: (string|*|string)}}
*/
createError(errorCode, message, responseData) {
let errorMessage = message
? message
: typeof this.errorCode[errorCode] !== "undefined"
? this.errorCode[errorCode].message
: "Unknown error";
const grpcCode =
typeof this.errorCode[errorCode] !== "undefined"
? this.errorCode[errorCode].grpcCode
: gRPC.status.UNKNOWN;
if (responseData) {
if (typeof responseData === "object") {
responseData = JSON.stringify(responseData);
}
errorMessage = errorMessage + `, data: ${responseData}`;
}
return { code: grpcCode, message: errorMessage };
}
},
created() {
this.server = new gRPC.Server();
this.errorCode = {
400: {
grpcCode: gRPC.status.INVALID_ARGUMENT,
message: "Bad Request"
},
401: {
grpcCode: gRPC.status.UNAUTHENTICATED,
message: "Unauthorized"
},
402: {
grpcCode: gRPC.status.INVALID_ARGUMENT,
message: "Payment Required"
},
403: {
grpcCode: gRPC.status.PERMISSION_DENIED,
message: "Forbidden"
},
404: {
grpcCode: gRPC.status.NOT_FOUND,
message: "Not Found"
},
405: {
grpcCode: gRPC.status.UNIMPLEMENTED,
message: "Method Not Allowed"
},
406: {
grpcCode: gRPC.status.CANCELLED,
message: "Not Acceptable"
},
407: {
grpcCode: gRPC.status.UNAUTHENTICATED,
message: "Proxy Authentication Required"
},
408: {
grpcCode: gRPC.status.CANCELLED,
message: "Request Timeout"
},
409: {
grpcCode: gRPC.status.ALREADY_EXISTS,
message: "Conflict"
},
410: {
grpcCode: gRPC.status.RESOURCE_EXHAUSTED,
message: "Gone"
},
411: {
grpcCode: gRPC.status.OUT_OF_RANGE,
message: "Length Required"
},
412: {
grpcCode: gRPC.status.FAILED_PRECONDITION,
message: "Precondition Failed"
},
413: {
grpcCode: gRPC.status.OUT_OF_RANGE,
message: "Request Entity Too Large"
},
414: {
grpcCode: gRPC.status.OUT_OF_RANGE,
message: "Request-URI Too Long"
},
415: {
grpcCode: gRPC.status.UNIMPLEMENTED,
message: "Unsupported Media Type"
},
416: {
grpcCode: gRPC.status.OUT_OF_RANGE,
message: "Requested Range Not Satisfiable"
},
417: {
grpcCode: gRPC.status.INVALID_ARGUMENT,
message: "Expectation Failed"
},
418: {
grpcCode: gRPC.status.UNKNOWN,
message: "I'm a teapot (RFC 2324)"
},
420: {
grpcCode: gRPC.status.UNIMPLEMENTED,
message: "Enhance Your Calm (Twitter)"
},
422: {
grpcCode: gRPC.status.UNKNOWN,
message: "Unprocessable Entity (WebDAV)"
},
423: {
grpcCode: gRPC.status.PERMISSION_DENIED,
message: "Locked (WebDAV)"
},
424: {
grpcCode: gRPC.status.FAILED_PRECONDITION,
message: "Failed Dependency (WebDAV)"
},
425: {
grpcCode: gRPC.status.PERMISSION_DENIED,
message: "Reserved for WebDAV"
},
426: {
grpcCode: gRPC.status.RESOURCE_EXHAUSTED,
message: "Upgrade Required"
},
428: {
grpcCode: gRPC.status.FAILED_PRECONDITION,
message: "Precondition Required"
},
429: {
grpcCode: gRPC.status.RESOURCE_EXHAUSTED,
message: "Too Many Requests"
},
431: {
grpcCode: gRPC.status.RESOURCE_EXHAUSTED,
message: "Request Header Fields Too Large"
},
444: {
grpcCode: gRPC.status.CANCELLED,
message: "No Response (Nginx)"
},
449: {
grpcCode: gRPC.status.DEADLINE_EXCEEDED,
message: "Retry With (Microsoft)"
},
450: {
grpcCode: gRPC.status.PERMISSION_DENIED,
message: "Blocked by Windows Parental Controls (Microsoft)"
},
451: {
grpcCode: gRPC.status.FAILED_PRECONDITION,
message: "Unavailable For Legal Reasons"
},
499: {
grpcCode: gRPC.status.CANCELLED,
message: "Client Closed Request (Nginx)"
},
500: {
grpcCode: gRPC.status.INTERNAL,
message: "Internal Server Error"
},
501: {
grpcCode: gRPC.status.UNIMPLEMENTED,
message: "Not Implemented"
},
502: {
grpcCode: gRPC.status.INVALID_ARGUMENT,
message: "Bad Gateway"
},
503: {
grpcCode: gRPC.status.UNAVAILABLE,
message: "Service Unavailable"
},
504: {
grpcCode: gRPC.status.DEADLINE_EXCEEDED,
message: "Gateway Timeout"
},
505: {
grpcCode: gRPC.status.UNIMPLEMENTED,
message: "HTTP Version Not Supported"
},
506: {
grpcCode: gRPC.status.INTERNAL,
message: "Variant Also Negotiates (Experimental)"
},
507: {
grpcCode: gRPC.status.INTERNAL,
message: "Insufficient Storage (WebDAV)"
},
508: {
grpcCode: gRPC.status.INTERNAL,
message: "Loop Detected (WebDAV)"
},
509: {
grpcCode: gRPC.status.RESOURCE_EXHAUSTED,
message: "Bandwidth Limit Exceeded (Apache)"
},
510: {
grpcCode: gRPC.status.OUT_OF_RANGE,
message: "Not Extended"
},
511: {
grpcCode: gRPC.status.PERMISSION_DENIED,
message: "Network Authentication Required"
},
598: {
grpcCode: gRPC.status.DEADLINE_EXCEEDED,
message: "Network read timeout error"
},
599: {
grpcCode: gRPC.status.DEADLINE_EXCEEDED,
message: "Network connect timeout error"
}
};
},
started() {
setTimeout(async () => {
await this.regenerateAutoAliases();
await this.server.bind(
`0.0.0.0:${this.settings.port}`,
gRPC.ServerCredentials.createInsecure()
);
await this.server.start();
this.logger.info(`🚀 gRPC server is available at ${this.settings.port}`);
}, 500);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment