Skip to content

Instantly share code, notes, and snippets.

@puuble
Last active January 18, 2023 22:38
Show Gist options
  • Select an option

  • Save puuble/d501bcdc03fb43499e09553458827c42 to your computer and use it in GitHub Desktop.

Select an option

Save puuble/d501bcdc03fb43499e09553458827c42 to your computer and use it in GitHub Desktop.
web socket handle on nodejs for big concurrent
const WebSocket = require('ws');
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.Console()
]
});
const server = new WebSocket.Server({ port: 8080 });
server.on('connection', (ws, req) => {
const clientLogger = logger.child({ clientIp: req.connection.remoteAddress });
clientLogger.info('New client connected');
ws.on('message', (message) => {
clientLogger.info(`Received message => ${message}`);
ws.send(`Echo: ${message}`);
});
ws.on('close', (connection) => {
clientLogger.info('Client disconnected');
});
ws.on('error', (error) => {
clientLogger.error(`Client error: ${error.message}`);
});
});
server.on('error', (error) => {
logger.error(`Server error: ${error.message}`);
});
console.log('WebSocket server started on port 8080');
const WebSocket = require('ws');
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.Console()
]
});
const server = new WebSocket.Server({ port: 8080 });
server.on('connection', async (ws, req) => {
const clientLogger = logger.child({ clientIp: req.connection.remoteAddress });
clientLogger.info('New client connected');
ws.on('message', async (message) => {
clientLogger.info(`Received message => ${message}`);
try{
await sendResponse(ws, message);
}catch(err){
clientLogger.error(`Error sending response: ${err.message}`);
}
});
ws.on('close', (connection) => {
clientLogger.info('Client disconnected');
});
ws.on('error', (error) => {
clientLogger.error(`Client error: ${error.message}`);
});
});
server.on('error', (error) => {
logger.error(`Server error: ${error.message}`);
});
console.log('WebSocket server started on port 8080');
async function sendResponse(ws, message) {
// simulate a delay
await new Promise((resolve) => setTimeout(resolve, 1000));
ws.send(`Echo: ${message}`);
}
const WebSocket = require('ws');
const winston = require('winston');
const Queue = require('bull');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.Console()
]
});
const server = new WebSocket.Server({ port: 8080 });
const messageQueue = new Queue('messageQueue');
server.on('connection', (ws, req) => {
const clientLogger = logger.child({ clientIp: req.connection.remoteAddress });
clientLogger.info('New client connected');
ws.on('message', async (message) => {
clientLogger.info(`Received message => ${message}`);
messageQueue.add({ message });
});
ws.on('close', (connection) => {
clientLogger.info('Client disconnected');
});
ws.on('error', (error) => {
clientLogger.error(`Client error: ${error.message}`);
});
});
server.on('error', (error) => {
logger.error(`Server error: ${error.message}`);
});
console.log('WebSocket server started on port 8080');
messageQueue.process(async (job) => {
const { message } = job.data;
// simulate a delay
await new Promise((resolve) => setTimeout(resolve, 1000));
ws.send(`Echo: ${message}`);
});
const WebSocket = require('ws');
const winston = require('winston');
const Queue = require('bull');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.Console()
]
});
const server = new WebSocket.Server({ port: 8080 });
const messageQueue = new Queue('messageQueue', {
redis: {
host: '127.0.0.1',
port: 6379
}
});
server.on('connection', (ws, req) => {
const clientLogger = logger.child({ clientIp: req.connection.remoteAddress });
clientLogger.info('New client connected');
ws.on('message', async (message) => {
clientLogger.info(`Received message => ${message}`);
messageQueue.add({ message });
});
ws.on('close', (connection) => {
clientLogger.info('Client disconnected');
});
ws.on('error', (error) => {
clientLogger.error(`Client error: ${error.message}`);
});
});
server.on('error', (error) => {
logger.error(`Server error: ${error.message}`);
});
console.log('WebSocket server started on port 8080');
messageQueue.process(async (job) => {
const { message } = job.data;
// simulate a delay
await new Promise((resolve) => setTimeout(resolve, 1000));
ws.send(`Echo: ${message}`);
});
const AWS = require('aws-sdk');
const winston = require('winston');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.Console()
]
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
server.on('connection', (ws, req) => {
const clientLogger = logger.child({ clientIp: req.connection.remoteAddress });
clientLogger.info('New client connected');
ws.on('message', async (message) => {
clientLogger.info(`Received message => ${message}`);
const params = {
MessageBody: message,
QueueUrl: queueUrl
};
try {
await sqs.sendMessage(params).promise();
} catch (err) {
clientLogger.error(`Error sending message to SQS: ${err.message}`);
}
});
ws.on('close', (connection) => {
clientLogger.info('Client disconnected');
});
ws.on('error', (error) => {
clientLogger.error(`Client error: ${error.message}`);
});
});
server.on('error', (error) => {
logger.error(`Server error: ${error.message}`);
});
console.log('WebSocket server started on port 8080');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
const client = new WebSocket('ws://localhost:8080');
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
});
client.on('close', function close() {
console.log('Client disconnected from the server');
});
client.on('error', function error(err) {
console.log(`Client error: ${err}`);
});
const processMessages = async () => {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
result.Messages.forEach(async (message) => {
const { Body } = message;
client.send(Body);
await sqs.deleteMessageBatch(deleteParams).promise();
});
}
setTimeout(processMessages, 0);
};
processMessages();
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
const client = new WebSocket('ws://localhost:8080');
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
});
client.on('close', function close() {
console.log('Client disconnected from the server');
});
client.on('error', function error(err) {
console.log(`Client error: ${err}`);
});
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
await new Promise((resolve) => {
client.send(Body, undefined, resolve);
});
await sqs.deleteMessageBatch(deleteParams).promise();
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
}
//Checking the number of bounced messages
try {
const queueAttributes = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessagesDelayed']
}).promise();
const numberOfBouncedMessages = queueAttributes.Attributes.ApproximateNumberOfMessagesDelayed;
console.log(`Number of bounced messages: ${numberOfBouncedMessages}`);
} catch (err) {
console.log(`Error getting queue attributes: ${err.message}`);
}
};
processMessages();
// you need to update this block from above f7 function place.
//Checking the number of bounced messages
try {
const queueAttributes = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessagesDelayed']
}).promise();
const numberOfBouncedMessages = queueAttributes.Attributes.ApproximateNumberOfMessagesDelayed;
if (numberOfBouncedMessages > 0) {
//resend the bounced message
const params = {
QueueUrl: queueUrl,
MessageBody: 'Bounced message body'
};
sqs.sendMessage(params, (err, data) => {
if (err) {
console.log(`Error resending bounced message: ${err.message}`);
} else {
console.log(`Bounced message resent successfully. Message ID: ${data.MessageId}`);
}
});
}
} catch (err) {
console.log(`Error getting queue attributes: ${err.message}`);
}
};
processMessages();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment