Instantly share code, notes, and snippets.
Created
January 7, 2021 08:40
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save leizongmin/1bd30dc84dacad4ff3f5b821601ddb62 to your computer and use it in GitHub Desktop.
worker_threads & SharedArrayBuffer Node.js 多线程共享内存(缓存管理)实验
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { MessageChannel } = require("worker_threads"); | |
class Shared { | |
#clientCounter = 0; | |
#ports = new Map(); | |
#map = new Map(); | |
#parentPort = null; | |
#callbacks = new Map(); | |
#callbackCounter = 0; | |
constructor({ main = false, port = null } = {}) { | |
this.isMain = !!main; | |
if (!this.isMain) { | |
this.#parentPort = port; | |
this.#parentPort.on("message", (data) => { | |
this.#log("got message from server", data); | |
const { seq, error, result } = data; | |
const callback = this.#callbacks.get(seq); | |
if (!callback) return; | |
this.#callbacks.delete(seq); | |
if (error) { | |
callback.reject(new Error(error)); | |
} else { | |
callback.resolve(result); | |
} | |
}); | |
} | |
} | |
#log(...args) { | |
// console.log("DEBUG:", ...args); | |
} | |
bindClientPort(port) { | |
const clientId = ++this.#clientCounter; | |
this.#ports.set(clientId, { port }); | |
port.on("message", (data) => { | |
this.#log("got message from client", clientId, data); | |
this.#processRequest(clientId, data); | |
}); | |
port.once("close", () => { | |
this.#ports.delete(clientId); | |
}); | |
} | |
#processRequest(clientId, data) { | |
this.#log("processMessage", clientId, data); | |
const port = this.#ports.get(clientId)?.port; | |
if (!port) return; | |
const { seq, cmd, args } = data; | |
switch (cmd) { | |
case "GET": | |
{ | |
const value = this.#map.get(args[0]); | |
port.postMessage({ seq, result: value }); | |
} | |
break; | |
case "SET": | |
{ | |
this.#map.set(args[0], args[1]); | |
port.postMessage({ seq, result: true }); | |
} | |
break; | |
case "DELETE": | |
{ | |
this.#map.delete(args[0]); | |
port.postMessage({ seq, result: true }); | |
} | |
break; | |
default: | |
port.postMessage({ seq, result: false }); | |
} | |
} | |
async #sendRequest(cmd, args = [], transferList = []) { | |
return new Promise((resolve, reject) => { | |
this.#log("sendRequest", cmd, args, transferList); | |
const seq = ++this.#callbackCounter; | |
this.#parentPort.postMessage({ seq, cmd, args }, transferList); | |
this.#callbacks.set(seq, { resolve, reject }); | |
}); | |
} | |
async get(key) { | |
const ret = await this.#sendRequest("GET", [key]); | |
if (ret) return Buffer.from(ret); | |
return ret; | |
} | |
async set(key, value) { | |
if (!Buffer.isBuffer(value)) throw new TypeError("expected a Buffer"); | |
const sharedValue = new SharedArrayBuffer(value.length); | |
value.copy(Buffer.from(sharedValue)); | |
return this.#sendRequest("SET", [key, sharedValue]); | |
} | |
async delete(key) { | |
return this.#sendRequest("DELETE", [key]); | |
} | |
close() { | |
if (this.#parentPort) this.#parentPort.close(); | |
this.#ports.forEach((_, { port }) => { | |
if (port) port.close(); | |
}); | |
this.#ports.clear(); | |
this.#callbacks.forEach((_, { reject }) => { | |
reject(new Error(`closed`)); | |
}); | |
this.#callbacks.clear(); | |
} | |
} | |
exports.Shared = Shared; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Shared } = require("./shared"); | |
const { Worker, isMainThread, parentPort } = require("worker_threads"); | |
async function sleep(ms) { | |
return new Promise((resolve) => setTimeout(resolve, ms)); | |
} | |
async function main() { | |
const s = new Shared({ main: true }); | |
const w = new Worker(__filename); | |
w.on("message", (port) => { | |
s.bindClientPort(port); | |
}); | |
await sleep(100000); | |
// console.log(w); | |
s.close(); | |
} | |
async function worker() { | |
const { port1, port2 } = new MessageChannel(); | |
const s = new Shared({ main: false, port: port2 }); | |
parentPort.postMessage(port1, [port1]); | |
console.log("worker", s); | |
for (let i = 0; i < 3; i++) { | |
console.time("alloc"); | |
const b = Buffer.alloc(1024 * 1024 * 1024).fill(1); | |
console.timeEnd("alloc"); | |
console.time("set"); | |
console.log(await s.set(i.toString(), b)); | |
console.timeEnd("set"); | |
} | |
for (let i = 0; i < 3; i++) { | |
console.time("get"); | |
const b = await s.get(i.toString()); | |
// console.log(b); | |
console.timeEnd("get"); | |
console.time("copy"); | |
const c = Buffer.from(b); | |
console.timeEnd("copy"); | |
} | |
} | |
if (isMainThread) { | |
run(main()); | |
} else { | |
run(worker()); | |
} | |
function run(p) { | |
if (p && p.then && p.catch) { | |
p.then(() => { | |
console.log("end"); | |
process.exit(); | |
}).catch((err) => { | |
console.error(err); | |
process.exit(1); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment