Skip to content

Instantly share code, notes, and snippets.

@puuble
Created January 18, 2023 23:15
Show Gist options
  • Save puuble/930c61dfcbf7caeb049f5daa84981b24 to your computer and use it in GitHub Desktop.
Save puuble/930c61dfcbf7caeb049f5daa84981b24 to your computer and use it in GitHub Desktop.
working on worker for websocket queue SQS how to multiple thread p4
const { Worker, isMainThread, parentPort } = require('worker_threads');
const WebSocket = require('ws');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
if (isMainThread) {
// Main thread
const worker = new Worker(__filename);
worker.on('message', (message) => {
console.log(`Received message from worker: ${message}`);
});
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
}
});
} else {
// Worker thread
const client = new WebSocket('ws://localhost:8080');
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
parentPort.postMessage(data);
});
client.on('close', function close() {
console.log('Client disconnected from the server');
});
client.on('error', function error(err) {
console.log(`Client error: ${err.message}`);
});
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
await new Promise((resolve) => {
client.send(Body, undefined, resolve);
});
await sqs.deleteMessageBatch(deleteParams).promise();
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
}
};
processMessages();
}
const { Worker, isMainThread, parentPort } = require('worker_threads');
const WebSocket = require('ws');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
if (isMainThread) {
// Main thread
const worker = new Worker(__filename);
worker.on('message', (message) => {
console.log(`Received message from worker: ${message}`);
});
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
}
});
// Create multiple workers
const numOfWorkers = 4;
for (let i = 0; i < numOfWorkers; i++) {
const worker = new Worker(__filename);
worker.on('message', (message) => {
console.log(`Received message from worker: ${message}`);
});
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
}
});
}
} else {
// Worker thread
const client = new WebSocket('ws://localhost:8080');
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
parentPort.postMessage(data);
});
client.on('close', function close() {
console.log('Client disconnected from the server');
});
client.on('error', function error(err) {
console.log(`Client error: ${err.message}`);
});
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
await new Promise((resolve) => {
client.send(Body, undefined, resolve);
});
await sqs.deleteMessageBatch(deleteParams).promise();
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
}
};
processMessages();
}
const { Worker, isMainThread, parentPort } = require('worker_threads');
const WebSocket = require('ws');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
const mongoose = require('mongoose');
const Log = require('./models/Log');
const Fail = require('./models/Fail');
mongoose.connect('mongodb://localhost/logs', { useNewUrlParser: true });
class LogService {
static async saveLog(data) {
try {
const log = new Log({
data
});
await log.save();
console.log(`Saved log: ${data}`);
} catch (err) {
console.log(`Error saving log: ${err.message}`);
}
}
static async saveFail(data) {
try {
const fail = new Fail({
data
});
await fail.save();
console.log(`Saved fail: ${data}`);
} catch (err) {
console.log(`Error saving fail: ${err.message}`);
}
}
}
if (isMainThread) {
// Main thread
const worker = new Worker(__filename);
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
LogService.saveFail({ error });
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
LogService.saveFail({ code });
}
});
// Create multiple workers
const numOfWorkers = 4;
for (let i = 0; i < numOfWorkers; i++) {
const worker = new Worker(__filename);
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
LogService.saveFail({ error });
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
LogService.saveFail({ code });
}
});
}
} else {
// Worker thread
const client = new WebSocket('ws://localhost:8080');
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
LogService.saveLog({ message: 'Client connected to the server' });
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
parentPort.postMessage(data);
LogService.saveLog({ message: `Received message from the server: ${data}` });
});
client.on('close', function close() {
console.log('Client disconnected from the server');
LogService.saveLog({ message: 'Client disconnected from the server' });
});
client.on('error', function error(err) {
console.log(`Client error: ${err.message}`);
LogService.saveFail({ message: `Client error: ${err.message}` });
});
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
await new Promise((resolve) => {
client.send(Body, undefined, resolve);
});
await sqs.deleteMessageBatch(deleteParams).promise();
LogService.saveLog({ message: `Sent message to the server: ${Body}` });
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
LogService.saveFail({ message: `Error processing SQS messages: ${err.message}` });
}
};
processMessages();
}
//you can use an event emitter instead of a WebSocket to handle the communication between the worker threads and the main thread
const { Worker, isMainThread, parentPort } = require('worker_threads');
const { EventEmitter } = require('events');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
const mongoose = require('mongoose');
const Log = require('./models/Log');
const Fail = require('./models/Fail');
mongoose.connect('mongodb://localhost/logs', { useNewUrlParser: true });
class LogService {
static async saveLog(data) {
try {
const log = new Log({
data
});
await log.save();
console.log(`Saved log: ${data}`);
} catch (err) {
console.log(`Error saving log: ${err.message}`);
}
}
static async saveFail(data) {
try {
const fail = new Fail({
data
});
await fail.save();
console.log(`Saved fail: ${data}`);
} catch (err) {
console.log(`Error saving fail: ${err.message}`);
}
}
}
if (isMainThread) {
// Main thread
const eventEmitter = new EventEmitter();
const worker = new Worker(__filename, { workerData: { eventEmitter } });
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
LogService.saveFail({ error });
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
LogService.saveFail({ code });
}
});
eventEmitter.on('message', (data) => {
console.log(`Received message from worker: ${data}`);
LogService.saveLog({ message: `Received message from worker: ${data}` });
});
// Create multiple workers
const numOfWorkers = 4;
for (let i = 0; i < numOfWorkers; i++) {
const worker = new Worker(__filename, { workerData: { eventEmitter } });
worker.on('error', (error) => {
console.error(`Error from worker: ${error.message}`);
LogService.saveFail({ error });
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
LogService.saveFail({ code });
}
});
}
} else {
// Worker thread
const { eventEmitter } = workerData;
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
eventEmitter.emit('message', Body);
await sqs.deleteMessageBatch(deleteParams).promise();
LogService.saveLog({ message: `Sent message to the main thread: ${Body}` });
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
LogService.saveFail({ message: `Error processing SQS messages: ${err.message}` });
}
};
processMessages();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment