Last active
June 29, 2023 02:39
-
-
Save SwadicalRag/0c6c5efd00d1d95d6fdc64c791be9623 to your computer and use it in GitHub Desktop.
A class for synchronous TCP using Worker threads and SharedArrayBuffer in node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Node.js worker threads and networking libraries | |
import { Worker, isMainThread, parentPort, workerData } from "node:worker_threads"; | |
import net from "net"; | |
const _DEBUG = true; | |
function log(...args: any) { | |
if(_DEBUG) console.log.apply(this, args); | |
} | |
// Define the size of the shared buffer. 16MB | |
const BUFFER_SIZE = 16 * 1024 * 1024; | |
/** | |
* Class to handle synchronous IO with shared buffers. | |
*/ | |
export class SyncIO { | |
private hasInMessage: Int32Array; | |
private inMessageQueueSize: Int32Array; | |
private inMessageLength: Int32Array; | |
private inMessage: Uint8Array; | |
/** | |
* SyncIO constructor. | |
* | |
* @param {Worker} worker - The worker thread. | |
* @param {SharedArrayBuffer} sharedReceiveBuffer - The shared buffer. | |
*/ | |
constructor(private worker: Worker, private sharedReceiveBuffer: SharedArrayBuffer) { | |
this.hasInMessage = new Int32Array(sharedReceiveBuffer, 0, 1); | |
this.inMessageQueueSize = new Int32Array(sharedReceiveBuffer, 4, 1); | |
this.inMessageLength = new Int32Array(sharedReceiveBuffer, 8, 1); | |
this.inMessage = new Uint8Array(sharedReceiveBuffer, 12); | |
this.processConnection(); | |
} | |
processConnection() { | |
const metadata = this.read(); | |
const success = metadata.readInt8(0); | |
if(!success) { | |
const msgLen = metadata.readInt32LE(1); | |
const msg = Buffer.from(Uint8Array.from(metadata).slice(5, msgLen)); | |
throw new Error(msg.toString()); | |
} | |
} | |
/** | |
* Read data from shared buffer. | |
* | |
* @param {boolean} block - If true, the function will block until there is a message. | |
* @returns {Buffer} The message read from the shared buffer. | |
*/ | |
read(block = true) { | |
let msg = Buffer.alloc(0); | |
while(block && Atomics.wait(this.hasInMessage, 0, 0) !== "not-equal") {} | |
if(block || (this.hasInMessage[0] === 1)) { | |
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]); | |
// Decrease message queue size | |
Atomics.sub(this.inMessageQueueSize, 0, 1); | |
Atomics.store(this.hasInMessage, 0, 0); | |
Atomics.notify(this.hasInMessage, 0); | |
// Continue reading if there are still parts of the message | |
while(Atomics.load(this.inMessageQueueSize,0) > 0) { | |
while(Atomics.wait(this.hasInMessage, 0, 0) !== "not-equal") {} | |
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]); | |
Atomics.sub(this.inMessageQueueSize, 0, 1); | |
Atomics.store(this.hasInMessage, 0, 0); | |
Atomics.notify(this.hasInMessage, 0); | |
} | |
return msg; | |
} | |
} | |
/** | |
* Asynchronously reads data from shared buffer | |
* | |
* @returns {Buffer} The message read from the shared buffer. | |
*/ | |
async readAsync() { | |
let msg = Buffer.alloc(0); | |
while(await (Atomics.waitAsync(this.hasInMessage, 0, 0 as any).value) !== "not-equal") {} | |
if(this.hasInMessage[0] === 1) { | |
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]); | |
// Decrease message queue size | |
Atomics.sub(this.inMessageQueueSize, 0, 1); | |
Atomics.store(this.hasInMessage, 0, 0); | |
Atomics.notify(this.hasInMessage, 0); | |
// Continue reading if there are still parts of the message | |
while(Atomics.load(this.inMessageQueueSize,0) > 0) { | |
while(await (Atomics.waitAsync(this.hasInMessage, 0, 0 as any).value) !== "not-equal") {} | |
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]); | |
Atomics.sub(this.inMessageQueueSize, 0, 1); | |
Atomics.store(this.hasInMessage, 0, 0); | |
Atomics.notify(this.hasInMessage, 0); | |
} | |
return msg; | |
} | |
} | |
/** | |
* Write data to shared buffer. | |
* | |
* @param {Buffer} msg - The message to be written to the shared buffer. | |
*/ | |
write(msg: Buffer) { | |
this.worker.postMessage({ | |
type: "write", | |
msg, | |
}) | |
} | |
/** | |
* Send a disconnect message to the worker thread. | |
*/ | |
disconnect() { | |
this.worker.postMessage({ | |
type: "disconnect", | |
}); | |
} | |
destroy() { | |
this.disconnect(); | |
} | |
} | |
/** | |
* Create a server. | |
* | |
* @param {string} ip - The IP address to bind the server. | |
* @param {number} port - The port to listen to. | |
* @returns {SyncIO} The SyncIO object. | |
*/ | |
export function createServer(ip: string, port: number) { | |
const sharedReceiveBuffer = new SharedArrayBuffer(BUFFER_SIZE); | |
const worker = new Worker(__filename, { workerData: { type: "server", ip, port, sharedReceiveBuffer } }); | |
return new SyncIO(worker,sharedReceiveBuffer); | |
} | |
/** | |
* Create a client. | |
* | |
* @param {string} ip - The IP address of the server. | |
* @param {number} port - The port of the server. | |
* @returns {SyncIO} The SyncIO object. | |
*/ | |
export function createClient(ip: string, port: number) { | |
const sharedReceiveBuffer = new SharedArrayBuffer(BUFFER_SIZE); | |
const worker = new Worker(__filename, { workerData: { type: "client", ip, port, sharedReceiveBuffer } }); | |
return new SyncIO(worker,sharedReceiveBuffer); | |
} | |
if(!isMainThread) { | |
if (workerData) { | |
const chunkSize = BUFFER_SIZE - 12; // Adjust chunk size for message metadata | |
const sharedReceiveBuffer = workerData.sharedReceiveBuffer; | |
const hasInMessage = new Int32Array(sharedReceiveBuffer, 0, 1); | |
const inMessageQueueSize = new Int32Array(sharedReceiveBuffer, 4, 1); | |
const inMessageLength = new Int32Array(sharedReceiveBuffer, 8, 1); | |
const inMessage = new Uint8Array(sharedReceiveBuffer, 12); | |
const inMessageBuffer: Uint8Array[] = []; | |
const outMessageBuffer: Uint8Array[] = []; | |
/** | |
* Process buffer. | |
* Take data from inMessageBuffer and put it into shared buffer. | |
*/ | |
function processInBuffer() { | |
while(Atomics.wait(hasInMessage, 0, 1) !== "not-equal") {} | |
if(inMessageBuffer.length > 0) { | |
let nextMessage = inMessageBuffer.shift(); | |
Atomics.store(inMessageQueueSize, 0, Math.ceil(nextMessage.length / chunkSize)); | |
while(nextMessage.length > chunkSize) { | |
inMessage.set(nextMessage.slice(0, chunkSize)); | |
inMessageLength[0] = chunkSize; | |
nextMessage = nextMessage.slice(chunkSize); | |
Atomics.store(hasInMessage, 0, 1); | |
Atomics.notify(hasInMessage, 0); | |
while(Atomics.wait(hasInMessage, 0, 1) !== "not-equal") {} | |
} | |
inMessage.set(nextMessage); | |
inMessageLength[0] = nextMessage.length; | |
Atomics.store(hasInMessage, 0, 1); | |
Atomics.notify(hasInMessage, 0); | |
} | |
} | |
let sendMessage: (msg: Buffer) => void = (msg: Buffer) => { | |
outMessageBuffer.push(msg); | |
}; | |
parentPort.on("message",(message) => { | |
if(message.type === "write") { | |
sendMessage(message.msg); | |
} | |
}) | |
let isConnected = false; | |
if(workerData.type === "client") { | |
/** | |
* Connect to a server. | |
* | |
* @param {string} ip - The IP address of the server. | |
* @param {number} port - The port of the server. | |
*/ | |
const connect = (ip, port) => { | |
log("Connecting..."); | |
const client = net.createConnection({ host: ip, port: port }, () => { | |
isConnected = true; | |
log("Connected to server!"); | |
inMessageBuffer.push(Uint8Array.from([1])); | |
processInBuffer(); | |
parentPort.postMessage({ type: "connected" }); | |
client.on("data", (data) => { | |
inMessageBuffer.push(Uint8Array.from(data)); | |
processInBuffer(); | |
}); | |
sendMessage = (msg) => { | |
client.write(msg); | |
}; | |
for(let msg of outMessageBuffer) { | |
client.write(msg); | |
} | |
outMessageBuffer.splice(0, outMessageBuffer.length); | |
}); | |
client.on("error", (err) => { | |
if(!isConnected) { | |
const errMsg = new String(err).toString(); | |
const errMsgBuffer = Buffer.alloc(5 + errMsg.length); | |
errMsgBuffer.writeInt8(0,0); | |
errMsgBuffer.writeInt32LE(errMsg.length,1); | |
errMsgBuffer.write(errMsg,5); | |
inMessageBuffer.push(Uint8Array.from(errMsgBuffer)); | |
processInBuffer(); | |
process.exit(); | |
} | |
console.error("Connection error:", err); | |
}); | |
parentPort.on("message", (message) => { | |
switch(message.type) { | |
case "disconnect": | |
client.end(); | |
break; | |
} | |
}); | |
}; | |
connect(workerData.ip, workerData.port); | |
} | |
else if (workerData && workerData.type === "server") { | |
/** | |
* Start a server. | |
* | |
* @param {string} ip - The IP address to bind the server. | |
* @param {number} port - The port to listen to. | |
*/ | |
const serve = (ip, port) => { | |
const server = net.createServer((client) => { | |
isConnected = true; | |
log("Client connected!"); | |
inMessageBuffer.push(Uint8Array.from([1])); | |
processInBuffer(); | |
parentPort.postMessage({ type: "connected" }); | |
client.on("end", () => { | |
log("Client disconnected!"); | |
}); | |
const stream = client; | |
stream.on("data", (data) => { | |
inMessageBuffer.push(data); | |
processInBuffer(); | |
}); | |
parentPort.on("message", (message) => { | |
switch(message.type) { | |
case "disconnect": | |
server.close(); | |
break; | |
} | |
}); | |
// Only accept one client | |
server.close(); | |
sendMessage = (msg) => { | |
stream.write(msg); | |
}; | |
for(let msg of outMessageBuffer) { | |
stream.write(msg); | |
} | |
outMessageBuffer.splice(0, outMessageBuffer.length); | |
}); | |
server.listen(port, ip, () => { | |
log(`Server listening on ${ip}:${port}`); | |
}); | |
server.on("error", (err) => { | |
if(!isConnected) { | |
const errMsg = new String(err).toString(); | |
const errMsgBuffer = Buffer.alloc(5 + errMsg.length); | |
errMsgBuffer.writeInt8(0,0); | |
errMsgBuffer.writeInt32LE(errMsg.length,1); | |
errMsgBuffer.write(errMsg,5); | |
inMessageBuffer.push(Uint8Array.from(errMsgBuffer)); | |
processInBuffer(); | |
process.exit(); | |
} | |
console.error("Server error:", err); | |
}); | |
}; | |
serve(workerData.ip, workerData.port); | |
} | |
parentPort.postMessage({ | |
type: "ready", | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment