Skip to content

Instantly share code, notes, and snippets.

@leizongmin
Created January 7, 2021 08:40
Show Gist options
  • Save leizongmin/1bd30dc84dacad4ff3f5b821601ddb62 to your computer and use it in GitHub Desktop.
Save leizongmin/1bd30dc84dacad4ff3f5b821601ddb62 to your computer and use it in GitHub Desktop.
worker_threads & SharedArrayBuffer Node.js 多线程共享内存(缓存管理)实验
const { MessageChannel } = require("worker_threads");
class Shared {
#clientCounter = 0;
#ports = new Map();
#map = new Map();
#parentPort = null;
#callbacks = new Map();
#callbackCounter = 0;
constructor({ main = false, port = null } = {}) {
this.isMain = !!main;
if (!this.isMain) {
this.#parentPort = port;
this.#parentPort.on("message", (data) => {
this.#log("got message from server", data);
const { seq, error, result } = data;
const callback = this.#callbacks.get(seq);
if (!callback) return;
this.#callbacks.delete(seq);
if (error) {
callback.reject(new Error(error));
} else {
callback.resolve(result);
}
});
}
}
#log(...args) {
// console.log("DEBUG:", ...args);
}
bindClientPort(port) {
const clientId = ++this.#clientCounter;
this.#ports.set(clientId, { port });
port.on("message", (data) => {
this.#log("got message from client", clientId, data);
this.#processRequest(clientId, data);
});
port.once("close", () => {
this.#ports.delete(clientId);
});
}
#processRequest(clientId, data) {
this.#log("processMessage", clientId, data);
const port = this.#ports.get(clientId)?.port;
if (!port) return;
const { seq, cmd, args } = data;
switch (cmd) {
case "GET":
{
const value = this.#map.get(args[0]);
port.postMessage({ seq, result: value });
}
break;
case "SET":
{
this.#map.set(args[0], args[1]);
port.postMessage({ seq, result: true });
}
break;
case "DELETE":
{
this.#map.delete(args[0]);
port.postMessage({ seq, result: true });
}
break;
default:
port.postMessage({ seq, result: false });
}
}
async #sendRequest(cmd, args = [], transferList = []) {
return new Promise((resolve, reject) => {
this.#log("sendRequest", cmd, args, transferList);
const seq = ++this.#callbackCounter;
this.#parentPort.postMessage({ seq, cmd, args }, transferList);
this.#callbacks.set(seq, { resolve, reject });
});
}
async get(key) {
const ret = await this.#sendRequest("GET", [key]);
if (ret) return Buffer.from(ret);
return ret;
}
async set(key, value) {
if (!Buffer.isBuffer(value)) throw new TypeError("expected a Buffer");
const sharedValue = new SharedArrayBuffer(value.length);
value.copy(Buffer.from(sharedValue));
return this.#sendRequest("SET", [key, sharedValue]);
}
async delete(key) {
return this.#sendRequest("DELETE", [key]);
}
close() {
if (this.#parentPort) this.#parentPort.close();
this.#ports.forEach((_, { port }) => {
if (port) port.close();
});
this.#ports.clear();
this.#callbacks.forEach((_, { reject }) => {
reject(new Error(`closed`));
});
this.#callbacks.clear();
}
}
exports.Shared = Shared;
const { Shared } = require("./shared");
const { Worker, isMainThread, parentPort } = require("worker_threads");
async function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function main() {
const s = new Shared({ main: true });
const w = new Worker(__filename);
w.on("message", (port) => {
s.bindClientPort(port);
});
await sleep(100000);
// console.log(w);
s.close();
}
async function worker() {
const { port1, port2 } = new MessageChannel();
const s = new Shared({ main: false, port: port2 });
parentPort.postMessage(port1, [port1]);
console.log("worker", s);
for (let i = 0; i < 3; i++) {
console.time("alloc");
const b = Buffer.alloc(1024 * 1024 * 1024).fill(1);
console.timeEnd("alloc");
console.time("set");
console.log(await s.set(i.toString(), b));
console.timeEnd("set");
}
for (let i = 0; i < 3; i++) {
console.time("get");
const b = await s.get(i.toString());
// console.log(b);
console.timeEnd("get");
console.time("copy");
const c = Buffer.from(b);
console.timeEnd("copy");
}
}
if (isMainThread) {
run(main());
} else {
run(worker());
}
function run(p) {
if (p && p.then && p.catch) {
p.then(() => {
console.log("end");
process.exit();
}).catch((err) => {
console.error(err);
process.exit(1);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment