Created
January 18, 2023 23:15
-
-
Save puuble/930c61dfcbf7caeb049f5daa84981b24 to your computer and use it in GitHub Desktop.
working on worker for websocket queue SQS how to multiple thread p4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Worker, isMainThread, parentPort } = require('worker_threads'); | |
const WebSocket = require('ws'); | |
const AWS = require('aws-sdk'); | |
const sqs = new AWS.SQS({ | |
region: 'us-west-2', | |
accessKeyId: 'ACCESS_KEY', | |
secretAccessKey: 'SECRET_KEY' | |
}); | |
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
if (isMainThread) { | |
// Main thread | |
const worker = new Worker(__filename); | |
worker.on('message', (message) => { | |
console.log(`Received message from worker: ${message}`); | |
}); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
} | |
}); | |
} else { | |
// Worker thread | |
const client = new WebSocket('ws://localhost:8080'); | |
client.on('open', function open() { | |
console.log('Client connected to the server'); | |
client.send('Hello World!'); | |
}); | |
client.on('message', function incoming(data) { | |
console.log(`Received message from the server: ${data}`); | |
parentPort.postMessage(data); | |
}); | |
client.on('close', function close() { | |
console.log('Client disconnected from the server'); | |
}); | |
client.on('error', function error(err) { | |
console.log(`Client error: ${err.message}`); | |
}); | |
const processMessages = async () => { | |
try { | |
const result = await sqs.receiveMessage({ | |
QueueUrl: queueUrl, | |
WaitTimeSeconds: 20 | |
}).promise(); | |
if (result.Messages) { | |
const deleteParams = { | |
QueueUrl: queueUrl, | |
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id})) | |
}; | |
for(const message of result.Messages){ | |
const { Body } = message; | |
await new Promise((resolve) => { | |
client.send(Body, undefined, resolve); | |
}); | |
await sqs.deleteMessageBatch(deleteParams).promise(); | |
} | |
} | |
setTimeout(processMessages, 0); | |
} catch (err) { | |
console.log(`Error processing SQS messages: ${err.message}`); | |
} | |
}; | |
processMessages(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Worker, isMainThread, parentPort } = require('worker_threads'); | |
const WebSocket = require('ws'); | |
const AWS = require('aws-sdk'); | |
const sqs = new AWS.SQS({ | |
region: 'us-west-2', | |
accessKeyId: 'ACCESS_KEY', | |
secretAccessKey: 'SECRET_KEY' | |
}); | |
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
if (isMainThread) { | |
// Main thread | |
const worker = new Worker(__filename); | |
worker.on('message', (message) => { | |
console.log(`Received message from worker: ${message}`); | |
}); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
} | |
}); | |
// Create multiple workers | |
const numOfWorkers = 4; | |
for (let i = 0; i < numOfWorkers; i++) { | |
const worker = new Worker(__filename); | |
worker.on('message', (message) => { | |
console.log(`Received message from worker: ${message}`); | |
}); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
} | |
}); | |
} | |
} else { | |
// Worker thread | |
const client = new WebSocket('ws://localhost:8080'); | |
client.on('open', function open() { | |
console.log('Client connected to the server'); | |
client.send('Hello World!'); | |
}); | |
client.on('message', function incoming(data) { | |
console.log(`Received message from the server: ${data}`); | |
parentPort.postMessage(data); | |
}); | |
client.on('close', function close() { | |
console.log('Client disconnected from the server'); | |
}); | |
client.on('error', function error(err) { | |
console.log(`Client error: ${err.message}`); | |
}); | |
const processMessages = async () => { | |
try { | |
const result = await sqs.receiveMessage({ | |
QueueUrl: queueUrl, | |
WaitTimeSeconds: 20 | |
}).promise(); | |
if (result.Messages) { | |
const deleteParams = { | |
QueueUrl: queueUrl, | |
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id})) | |
}; | |
for(const message of result.Messages){ | |
const { Body } = message; | |
await new Promise((resolve) => { | |
client.send(Body, undefined, resolve); | |
}); | |
await sqs.deleteMessageBatch(deleteParams).promise(); | |
} | |
} | |
setTimeout(processMessages, 0); | |
} catch (err) { | |
console.log(`Error processing SQS messages: ${err.message}`); | |
} | |
}; | |
processMessages(); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Worker, isMainThread, parentPort } = require('worker_threads'); | |
const WebSocket = require('ws'); | |
const AWS = require('aws-sdk'); | |
const sqs = new AWS.SQS({ | |
region: 'us-west-2', | |
accessKeyId: 'ACCESS_KEY', | |
secretAccessKey: 'SECRET_KEY' | |
}); | |
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
const mongoose = require('mongoose'); | |
const Log = require('./models/Log'); | |
const Fail = require('./models/Fail'); | |
mongoose.connect('mongodb://localhost/logs', { useNewUrlParser: true }); | |
class LogService { | |
static async saveLog(data) { | |
try { | |
const log = new Log({ | |
data | |
}); | |
await log.save(); | |
console.log(`Saved log: ${data}`); | |
} catch (err) { | |
console.log(`Error saving log: ${err.message}`); | |
} | |
} | |
static async saveFail(data) { | |
try { | |
const fail = new Fail({ | |
data | |
}); | |
await fail.save(); | |
console.log(`Saved fail: ${data}`); | |
} catch (err) { | |
console.log(`Error saving fail: ${err.message}`); | |
} | |
} | |
} | |
if (isMainThread) { | |
// Main thread | |
const worker = new Worker(__filename); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
LogService.saveFail({ error }); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
LogService.saveFail({ code }); | |
} | |
}); | |
// Create multiple workers | |
const numOfWorkers = 4; | |
for (let i = 0; i < numOfWorkers; i++) { | |
const worker = new Worker(__filename); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
LogService.saveFail({ error }); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
LogService.saveFail({ code }); | |
} | |
}); | |
} | |
} else { | |
// Worker thread | |
const client = new WebSocket('ws://localhost:8080'); | |
client.on('open', function open() { | |
console.log('Client connected to the server'); | |
client.send('Hello World!'); | |
LogService.saveLog({ message: 'Client connected to the server' }); | |
}); | |
client.on('message', function incoming(data) { | |
console.log(`Received message from the server: ${data}`); | |
parentPort.postMessage(data); | |
LogService.saveLog({ message: `Received message from the server: ${data}` }); | |
}); | |
client.on('close', function close() { | |
console.log('Client disconnected from the server'); | |
LogService.saveLog({ message: 'Client disconnected from the server' }); | |
}); | |
client.on('error', function error(err) { | |
console.log(`Client error: ${err.message}`); | |
LogService.saveFail({ message: `Client error: ${err.message}` }); | |
}); | |
const processMessages = async () => { | |
try { | |
const result = await sqs.receiveMessage({ | |
QueueUrl: queueUrl, | |
WaitTimeSeconds: 20 | |
}).promise(); | |
if (result.Messages) { | |
const deleteParams = { | |
QueueUrl: queueUrl, | |
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id})) | |
}; | |
for(const message of result.Messages){ | |
const { Body } = message; | |
await new Promise((resolve) => { | |
client.send(Body, undefined, resolve); | |
}); | |
await sqs.deleteMessageBatch(deleteParams).promise(); | |
LogService.saveLog({ message: `Sent message to the server: ${Body}` }); | |
} | |
} | |
setTimeout(processMessages, 0); | |
} catch (err) { | |
console.log(`Error processing SQS messages: ${err.message}`); | |
LogService.saveFail({ message: `Error processing SQS messages: ${err.message}` }); | |
} | |
}; | |
processMessages(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//you can use an event emitter instead of a WebSocket to handle the communication between the worker threads and the main thread | |
const { Worker, isMainThread, parentPort } = require('worker_threads'); | |
const { EventEmitter } = require('events'); | |
const AWS = require('aws-sdk'); | |
const sqs = new AWS.SQS({ | |
region: 'us-west-2', | |
accessKeyId: 'ACCESS_KEY', | |
secretAccessKey: 'SECRET_KEY' | |
}); | |
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
const mongoose = require('mongoose'); | |
const Log = require('./models/Log'); | |
const Fail = require('./models/Fail'); | |
mongoose.connect('mongodb://localhost/logs', { useNewUrlParser: true }); | |
class LogService { | |
static async saveLog(data) { | |
try { | |
const log = new Log({ | |
data | |
}); | |
await log.save(); | |
console.log(`Saved log: ${data}`); | |
} catch (err) { | |
console.log(`Error saving log: ${err.message}`); | |
} | |
} | |
static async saveFail(data) { | |
try { | |
const fail = new Fail({ | |
data | |
}); | |
await fail.save(); | |
console.log(`Saved fail: ${data}`); | |
} catch (err) { | |
console.log(`Error saving fail: ${err.message}`); | |
} | |
} | |
} | |
if (isMainThread) { | |
// Main thread | |
const eventEmitter = new EventEmitter(); | |
const worker = new Worker(__filename, { workerData: { eventEmitter } }); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
LogService.saveFail({ error }); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
LogService.saveFail({ code }); | |
} | |
}); | |
eventEmitter.on('message', (data) => { | |
console.log(`Received message from worker: ${data}`); | |
LogService.saveLog({ message: `Received message from worker: ${data}` }); | |
}); | |
// Create multiple workers | |
const numOfWorkers = 4; | |
for (let i = 0; i < numOfWorkers; i++) { | |
const worker = new Worker(__filename, { workerData: { eventEmitter } }); | |
worker.on('error', (error) => { | |
console.error(`Error from worker: ${error.message}`); | |
LogService.saveFail({ error }); | |
}); | |
worker.on('exit', (code) => { | |
if (code !== 0) { | |
console.error(`Worker stopped with exit code ${code}`); | |
LogService.saveFail({ code }); | |
} | |
}); | |
} | |
} else { | |
// Worker thread | |
const { eventEmitter } = workerData; | |
const processMessages = async () => { | |
try { | |
const result = await sqs.receiveMessage({ | |
QueueUrl: queueUrl, | |
WaitTimeSeconds: 20 | |
}).promise(); | |
if (result.Messages) { | |
const deleteParams = { | |
QueueUrl: queueUrl, | |
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id})) | |
}; | |
for(const message of result.Messages){ | |
const { Body } = message; | |
eventEmitter.emit('message', Body); | |
await sqs.deleteMessageBatch(deleteParams).promise(); | |
LogService.saveLog({ message: `Sent message to the main thread: ${Body}` }); | |
} | |
} | |
setTimeout(processMessages, 0); | |
} catch (err) { | |
console.log(`Error processing SQS messages: ${err.message}`); | |
LogService.saveFail({ message: `Error processing SQS messages: ${err.message}` }); | |
} | |
}; | |
processMessages(); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment