Skip to content

Instantly share code, notes, and snippets.

@SwadicalRag
Last active June 29, 2023 02:39
Show Gist options
  • Save SwadicalRag/0c6c5efd00d1d95d6fdc64c791be9623 to your computer and use it in GitHub Desktop.
Save SwadicalRag/0c6c5efd00d1d95d6fdc64c791be9623 to your computer and use it in GitHub Desktop.
A class for synchronous TCP using Worker threads and SharedArrayBuffer in node.js
// Node.js worker threads and networking libraries
import { Worker, isMainThread, parentPort, workerData } from "node:worker_threads";
import net from "net";
const _DEBUG = true;
function log(...args: any) {
if(_DEBUG) console.log.apply(this, args);
}
// Define the size of the shared buffer. 16MB
const BUFFER_SIZE = 16 * 1024 * 1024;
/**
* Class to handle synchronous IO with shared buffers.
*/
export class SyncIO {
private hasInMessage: Int32Array;
private inMessageQueueSize: Int32Array;
private inMessageLength: Int32Array;
private inMessage: Uint8Array;
/**
* SyncIO constructor.
*
* @param {Worker} worker - The worker thread.
* @param {SharedArrayBuffer} sharedReceiveBuffer - The shared buffer.
*/
constructor(private worker: Worker, private sharedReceiveBuffer: SharedArrayBuffer) {
this.hasInMessage = new Int32Array(sharedReceiveBuffer, 0, 1);
this.inMessageQueueSize = new Int32Array(sharedReceiveBuffer, 4, 1);
this.inMessageLength = new Int32Array(sharedReceiveBuffer, 8, 1);
this.inMessage = new Uint8Array(sharedReceiveBuffer, 12);
this.processConnection();
}
processConnection() {
const metadata = this.read();
const success = metadata.readInt8(0);
if(!success) {
const msgLen = metadata.readInt32LE(1);
const msg = Buffer.from(Uint8Array.from(metadata).slice(5, msgLen));
throw new Error(msg.toString());
}
}
/**
* Read data from shared buffer.
*
* @param {boolean} block - If true, the function will block until there is a message.
* @returns {Buffer} The message read from the shared buffer.
*/
read(block = true) {
let msg = Buffer.alloc(0);
while(block && Atomics.wait(this.hasInMessage, 0, 0) !== "not-equal") {}
if(block || (this.hasInMessage[0] === 1)) {
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]);
// Decrease message queue size
Atomics.sub(this.inMessageQueueSize, 0, 1);
Atomics.store(this.hasInMessage, 0, 0);
Atomics.notify(this.hasInMessage, 0);
// Continue reading if there are still parts of the message
while(Atomics.load(this.inMessageQueueSize,0) > 0) {
while(Atomics.wait(this.hasInMessage, 0, 0) !== "not-equal") {}
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]);
Atomics.sub(this.inMessageQueueSize, 0, 1);
Atomics.store(this.hasInMessage, 0, 0);
Atomics.notify(this.hasInMessage, 0);
}
return msg;
}
}
/**
* Asynchronously reads data from shared buffer
*
* @returns {Buffer} The message read from the shared buffer.
*/
async readAsync() {
let msg = Buffer.alloc(0);
while(await (Atomics.waitAsync(this.hasInMessage, 0, 0 as any).value) !== "not-equal") {}
if(this.hasInMessage[0] === 1) {
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]);
// Decrease message queue size
Atomics.sub(this.inMessageQueueSize, 0, 1);
Atomics.store(this.hasInMessage, 0, 0);
Atomics.notify(this.hasInMessage, 0);
// Continue reading if there are still parts of the message
while(Atomics.load(this.inMessageQueueSize,0) > 0) {
while(await (Atomics.waitAsync(this.hasInMessage, 0, 0 as any).value) !== "not-equal") {}
msg = Buffer.concat([msg, Buffer.from(this.inMessage.slice(0, this.inMessageLength[0]))]);
Atomics.sub(this.inMessageQueueSize, 0, 1);
Atomics.store(this.hasInMessage, 0, 0);
Atomics.notify(this.hasInMessage, 0);
}
return msg;
}
}
/**
* Write data to shared buffer.
*
* @param {Buffer} msg - The message to be written to the shared buffer.
*/
write(msg: Buffer) {
this.worker.postMessage({
type: "write",
msg,
})
}
/**
* Send a disconnect message to the worker thread.
*/
disconnect() {
this.worker.postMessage({
type: "disconnect",
});
}
destroy() {
this.disconnect();
}
}
/**
* Create a server.
*
* @param {string} ip - The IP address to bind the server.
* @param {number} port - The port to listen to.
* @returns {SyncIO} The SyncIO object.
*/
export function createServer(ip: string, port: number) {
const sharedReceiveBuffer = new SharedArrayBuffer(BUFFER_SIZE);
const worker = new Worker(__filename, { workerData: { type: "server", ip, port, sharedReceiveBuffer } });
return new SyncIO(worker,sharedReceiveBuffer);
}
/**
* Create a client.
*
* @param {string} ip - The IP address of the server.
* @param {number} port - The port of the server.
* @returns {SyncIO} The SyncIO object.
*/
export function createClient(ip: string, port: number) {
const sharedReceiveBuffer = new SharedArrayBuffer(BUFFER_SIZE);
const worker = new Worker(__filename, { workerData: { type: "client", ip, port, sharedReceiveBuffer } });
return new SyncIO(worker,sharedReceiveBuffer);
}
if(!isMainThread) {
if (workerData) {
const chunkSize = BUFFER_SIZE - 12; // Adjust chunk size for message metadata
const sharedReceiveBuffer = workerData.sharedReceiveBuffer;
const hasInMessage = new Int32Array(sharedReceiveBuffer, 0, 1);
const inMessageQueueSize = new Int32Array(sharedReceiveBuffer, 4, 1);
const inMessageLength = new Int32Array(sharedReceiveBuffer, 8, 1);
const inMessage = new Uint8Array(sharedReceiveBuffer, 12);
const inMessageBuffer: Uint8Array[] = [];
const outMessageBuffer: Uint8Array[] = [];
/**
* Process buffer.
* Take data from inMessageBuffer and put it into shared buffer.
*/
function processInBuffer() {
while(Atomics.wait(hasInMessage, 0, 1) !== "not-equal") {}
if(inMessageBuffer.length > 0) {
let nextMessage = inMessageBuffer.shift();
Atomics.store(inMessageQueueSize, 0, Math.ceil(nextMessage.length / chunkSize));
while(nextMessage.length > chunkSize) {
inMessage.set(nextMessage.slice(0, chunkSize));
inMessageLength[0] = chunkSize;
nextMessage = nextMessage.slice(chunkSize);
Atomics.store(hasInMessage, 0, 1);
Atomics.notify(hasInMessage, 0);
while(Atomics.wait(hasInMessage, 0, 1) !== "not-equal") {}
}
inMessage.set(nextMessage);
inMessageLength[0] = nextMessage.length;
Atomics.store(hasInMessage, 0, 1);
Atomics.notify(hasInMessage, 0);
}
}
let sendMessage: (msg: Buffer) => void = (msg: Buffer) => {
outMessageBuffer.push(msg);
};
parentPort.on("message",(message) => {
if(message.type === "write") {
sendMessage(message.msg);
}
})
let isConnected = false;
if(workerData.type === "client") {
/**
* Connect to a server.
*
* @param {string} ip - The IP address of the server.
* @param {number} port - The port of the server.
*/
const connect = (ip, port) => {
log("Connecting...");
const client = net.createConnection({ host: ip, port: port }, () => {
isConnected = true;
log("Connected to server!");
inMessageBuffer.push(Uint8Array.from([1]));
processInBuffer();
parentPort.postMessage({ type: "connected" });
client.on("data", (data) => {
inMessageBuffer.push(Uint8Array.from(data));
processInBuffer();
});
sendMessage = (msg) => {
client.write(msg);
};
for(let msg of outMessageBuffer) {
client.write(msg);
}
outMessageBuffer.splice(0, outMessageBuffer.length);
});
client.on("error", (err) => {
if(!isConnected) {
const errMsg = new String(err).toString();
const errMsgBuffer = Buffer.alloc(5 + errMsg.length);
errMsgBuffer.writeInt8(0,0);
errMsgBuffer.writeInt32LE(errMsg.length,1);
errMsgBuffer.write(errMsg,5);
inMessageBuffer.push(Uint8Array.from(errMsgBuffer));
processInBuffer();
process.exit();
}
console.error("Connection error:", err);
});
parentPort.on("message", (message) => {
switch(message.type) {
case "disconnect":
client.end();
break;
}
});
};
connect(workerData.ip, workerData.port);
}
else if (workerData && workerData.type === "server") {
/**
* Start a server.
*
* @param {string} ip - The IP address to bind the server.
* @param {number} port - The port to listen to.
*/
const serve = (ip, port) => {
const server = net.createServer((client) => {
isConnected = true;
log("Client connected!");
inMessageBuffer.push(Uint8Array.from([1]));
processInBuffer();
parentPort.postMessage({ type: "connected" });
client.on("end", () => {
log("Client disconnected!");
});
const stream = client;
stream.on("data", (data) => {
inMessageBuffer.push(data);
processInBuffer();
});
parentPort.on("message", (message) => {
switch(message.type) {
case "disconnect":
server.close();
break;
}
});
// Only accept one client
server.close();
sendMessage = (msg) => {
stream.write(msg);
};
for(let msg of outMessageBuffer) {
stream.write(msg);
}
outMessageBuffer.splice(0, outMessageBuffer.length);
});
server.listen(port, ip, () => {
log(`Server listening on ${ip}:${port}`);
});
server.on("error", (err) => {
if(!isConnected) {
const errMsg = new String(err).toString();
const errMsgBuffer = Buffer.alloc(5 + errMsg.length);
errMsgBuffer.writeInt8(0,0);
errMsgBuffer.writeInt32LE(errMsg.length,1);
errMsgBuffer.write(errMsg,5);
inMessageBuffer.push(Uint8Array.from(errMsgBuffer));
processInBuffer();
process.exit();
}
console.error("Server error:", err);
});
};
serve(workerData.ip, workerData.port);
}
parentPort.postMessage({
type: "ready",
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment