Last active
January 18, 2023 22:38
-
-
Save puuble/d501bcdc03fb43499e09553458827c42 to your computer and use it in GitHub Desktop.
web socket handle on nodejs for big concurrent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const WebSocket = require('ws'); | |
| const winston = require('winston'); | |
| const logger = winston.createLogger({ | |
| level: 'info', | |
| format: winston.format.json(), | |
| transports: [ | |
| new winston.transports.Console() | |
| ] | |
| }); | |
| const server = new WebSocket.Server({ port: 8080 }); | |
| server.on('connection', (ws, req) => { | |
| const clientLogger = logger.child({ clientIp: req.connection.remoteAddress }); | |
| clientLogger.info('New client connected'); | |
| ws.on('message', (message) => { | |
| clientLogger.info(`Received message => ${message}`); | |
| ws.send(`Echo: ${message}`); | |
| }); | |
| ws.on('close', (connection) => { | |
| clientLogger.info('Client disconnected'); | |
| }); | |
| ws.on('error', (error) => { | |
| clientLogger.error(`Client error: ${error.message}`); | |
| }); | |
| }); | |
| server.on('error', (error) => { | |
| logger.error(`Server error: ${error.message}`); | |
| }); | |
| console.log('WebSocket server started on port 8080'); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const WebSocket = require('ws'); | |
| const winston = require('winston'); | |
| const logger = winston.createLogger({ | |
| level: 'info', | |
| format: winston.format.json(), | |
| transports: [ | |
| new winston.transports.Console() | |
| ] | |
| }); | |
| const server = new WebSocket.Server({ port: 8080 }); | |
| server.on('connection', async (ws, req) => { | |
| const clientLogger = logger.child({ clientIp: req.connection.remoteAddress }); | |
| clientLogger.info('New client connected'); | |
| ws.on('message', async (message) => { | |
| clientLogger.info(`Received message => ${message}`); | |
| try{ | |
| await sendResponse(ws, message); | |
| }catch(err){ | |
| clientLogger.error(`Error sending response: ${err.message}`); | |
| } | |
| }); | |
| ws.on('close', (connection) => { | |
| clientLogger.info('Client disconnected'); | |
| }); | |
| ws.on('error', (error) => { | |
| clientLogger.error(`Client error: ${error.message}`); | |
| }); | |
| }); | |
| server.on('error', (error) => { | |
| logger.error(`Server error: ${error.message}`); | |
| }); | |
| console.log('WebSocket server started on port 8080'); | |
| async function sendResponse(ws, message) { | |
| // simulate a delay | |
| await new Promise((resolve) => setTimeout(resolve, 1000)); | |
| ws.send(`Echo: ${message}`); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const WebSocket = require('ws'); | |
| const winston = require('winston'); | |
| const Queue = require('bull'); | |
| const logger = winston.createLogger({ | |
| level: 'info', | |
| format: winston.format.json(), | |
| transports: [ | |
| new winston.transports.Console() | |
| ] | |
| }); | |
| const server = new WebSocket.Server({ port: 8080 }); | |
| const messageQueue = new Queue('messageQueue'); | |
| server.on('connection', (ws, req) => { | |
| const clientLogger = logger.child({ clientIp: req.connection.remoteAddress }); | |
| clientLogger.info('New client connected'); | |
| ws.on('message', async (message) => { | |
| clientLogger.info(`Received message => ${message}`); | |
| messageQueue.add({ message }); | |
| }); | |
| ws.on('close', (connection) => { | |
| clientLogger.info('Client disconnected'); | |
| }); | |
| ws.on('error', (error) => { | |
| clientLogger.error(`Client error: ${error.message}`); | |
| }); | |
| }); | |
| server.on('error', (error) => { | |
| logger.error(`Server error: ${error.message}`); | |
| }); | |
| console.log('WebSocket server started on port 8080'); | |
| messageQueue.process(async (job) => { | |
| const { message } = job.data; | |
| // simulate a delay | |
| await new Promise((resolve) => setTimeout(resolve, 1000)); | |
| ws.send(`Echo: ${message}`); | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const WebSocket = require('ws'); | |
| const winston = require('winston'); | |
| const Queue = require('bull'); | |
| const logger = winston.createLogger({ | |
| level: 'info', | |
| format: winston.format.json(), | |
| transports: [ | |
| new winston.transports.Console() | |
| ] | |
| }); | |
| const server = new WebSocket.Server({ port: 8080 }); | |
| const messageQueue = new Queue('messageQueue', { | |
| redis: { | |
| host: '127.0.0.1', | |
| port: 6379 | |
| } | |
| }); | |
| server.on('connection', (ws, req) => { | |
| const clientLogger = logger.child({ clientIp: req.connection.remoteAddress }); | |
| clientLogger.info('New client connected'); | |
| ws.on('message', async (message) => { | |
| clientLogger.info(`Received message => ${message}`); | |
| messageQueue.add({ message }); | |
| }); | |
| ws.on('close', (connection) => { | |
| clientLogger.info('Client disconnected'); | |
| }); | |
| ws.on('error', (error) => { | |
| clientLogger.error(`Client error: ${error.message}`); | |
| }); | |
| }); | |
| server.on('error', (error) => { | |
| logger.error(`Server error: ${error.message}`); | |
| }); | |
| console.log('WebSocket server started on port 8080'); | |
| messageQueue.process(async (job) => { | |
| const { message } = job.data; | |
| // simulate a delay | |
| await new Promise((resolve) => setTimeout(resolve, 1000)); | |
| ws.send(`Echo: ${message}`); | |
| }); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const AWS = require('aws-sdk'); | |
| const winston = require('winston'); | |
| const sqs = new AWS.SQS({ | |
| region: 'us-west-2', | |
| accessKeyId: 'ACCESS_KEY', | |
| secretAccessKey: 'SECRET_KEY' | |
| }); | |
| const logger = winston.createLogger({ | |
| level: 'info', | |
| format: winston.format.json(), | |
| transports: [ | |
| new winston.transports.Console() | |
| ] | |
| }); | |
| const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
| server.on('connection', (ws, req) => { | |
| const clientLogger = logger.child({ clientIp: req.connection.remoteAddress }); | |
| clientLogger.info('New client connected'); | |
| ws.on('message', async (message) => { | |
| clientLogger.info(`Received message => ${message}`); | |
| const params = { | |
| MessageBody: message, | |
| QueueUrl: queueUrl | |
| }; | |
| try { | |
| await sqs.sendMessage(params).promise(); | |
| } catch (err) { | |
| clientLogger.error(`Error sending message to SQS: ${err.message}`); | |
| } | |
| }); | |
| ws.on('close', (connection) => { | |
| clientLogger.info('Client disconnected'); | |
| }); | |
| ws.on('error', (error) => { | |
| clientLogger.error(`Client error: ${error.message}`); | |
| }); | |
| }); | |
| server.on('error', (error) => { | |
| logger.error(`Server error: ${error.message}`); | |
| }); | |
| console.log('WebSocket server started on port 8080'); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const AWS = require('aws-sdk'); | |
| const sqs = new AWS.SQS({ | |
| region: 'us-west-2', | |
| accessKeyId: 'ACCESS_KEY', | |
| secretAccessKey: 'SECRET_KEY' | |
| }); | |
| const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
| const client = new WebSocket('ws://localhost:8080'); | |
| client.on('open', function open() { | |
| console.log('Client connected to the server'); | |
| client.send('Hello World!'); | |
| }); | |
| client.on('message', function incoming(data) { | |
| console.log(`Received message from the server: ${data}`); | |
| }); | |
| client.on('close', function close() { | |
| console.log('Client disconnected from the server'); | |
| }); | |
| client.on('error', function error(err) { | |
| console.log(`Client error: ${err}`); | |
| }); | |
| const processMessages = async () => { | |
| const result = await sqs.receiveMessage({ | |
| QueueUrl: queueUrl, | |
| WaitTimeSeconds: 20 | |
| }).promise(); | |
| if (result.Messages) { | |
| const deleteParams = { | |
| QueueUrl: queueUrl, | |
| Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id})) | |
| }; | |
| result.Messages.forEach(async (message) => { | |
| const { Body } = message; | |
| client.send(Body); | |
| await sqs.deleteMessageBatch(deleteParams).promise(); | |
| }); | |
| } | |
| setTimeout(processMessages, 0); | |
| }; | |
| processMessages(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const AWS = require('aws-sdk'); | |
| const sqs = new AWS.SQS({ | |
| region: 'us-west-2', | |
| accessKeyId: 'ACCESS_KEY', | |
| secretAccessKey: 'SECRET_KEY' | |
| }); | |
| const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue'; | |
| const client = new WebSocket('ws://localhost:8080'); | |
| client.on('open', function open() { | |
| console.log('Client connected to the server'); | |
| client.send('Hello World!'); | |
| }); | |
| client.on('message', function incoming(data) { | |
| console.log(`Received message from the server: ${data}`); | |
| }); | |
| client.on('close', function close() { | |
| console.log('Client disconnected from the server'); | |
| }); | |
| client.on('error', function error(err) { | |
| console.log(`Client error: ${err}`); | |
| }); | |
| const processMessages = async () => { | |
| try { | |
| const result = await sqs.receiveMessage({ | |
| QueueUrl: queueUrl, | |
| WaitTimeSeconds: 20 | |
| }).promise(); | |
| if (result.Messages) { | |
| const deleteParams = { | |
| QueueUrl: queueUrl, | |
| Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id})) | |
| }; | |
| for(const message of result.Messages){ | |
| const { Body } = message; | |
| await new Promise((resolve) => { | |
| client.send(Body, undefined, resolve); | |
| }); | |
| await sqs.deleteMessageBatch(deleteParams).promise(); | |
| } | |
| } | |
| setTimeout(processMessages, 0); | |
| } catch (err) { | |
| console.log(`Error processing SQS messages: ${err.message}`); | |
| } | |
| //Checking the number of bounced messages | |
| try { | |
| const queueAttributes = await sqs.getQueueAttributes({ | |
| QueueUrl: queueUrl, | |
| AttributeNames: ['ApproximateNumberOfMessagesDelayed'] | |
| }).promise(); | |
| const numberOfBouncedMessages = queueAttributes.Attributes.ApproximateNumberOfMessagesDelayed; | |
| console.log(`Number of bounced messages: ${numberOfBouncedMessages}`); | |
| } catch (err) { | |
| console.log(`Error getting queue attributes: ${err.message}`); | |
| } | |
| }; | |
| processMessages(); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // you need to update this block from above f7 function place. | |
| //Checking the number of bounced messages | |
| try { | |
| const queueAttributes = await sqs.getQueueAttributes({ | |
| QueueUrl: queueUrl, | |
| AttributeNames: ['ApproximateNumberOfMessagesDelayed'] | |
| }).promise(); | |
| const numberOfBouncedMessages = queueAttributes.Attributes.ApproximateNumberOfMessagesDelayed; | |
| if (numberOfBouncedMessages > 0) { | |
| //resend the bounced message | |
| const params = { | |
| QueueUrl: queueUrl, | |
| MessageBody: 'Bounced message body' | |
| }; | |
| sqs.sendMessage(params, (err, data) => { | |
| if (err) { | |
| console.log(`Error resending bounced message: ${err.message}`); | |
| } else { | |
| console.log(`Bounced message resent successfully. Message ID: ${data.MessageId}`); | |
| } | |
| }); | |
| } | |
| } catch (err) { | |
| console.log(`Error getting queue attributes: ${err.message}`); | |
| } | |
| }; | |
| processMessages(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment