Skip to content

Instantly share code, notes, and snippets.

@puuble
Last active January 18, 2023 22:50
Show Gist options
  • Save puuble/a9a1261297dd4792c4c9e9cf6fb6a643 to your computer and use it in GitHub Desktop.
Save puuble/a9a1261297dd4792c4c9e9cf6fb6a643 to your computer and use it in GitHub Desktop.
adding mongodb websocket erorr logs with polling
//https://gist.github.com/puuble/d501bcdc03fb43499e09553458827c42 part 2
const MongoClient = require('mongodb').MongoClient;
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
const client = new WebSocket('ws://localhost:8080');
const mongoUrl = 'mongodb://username:password@host:port/database';
let db;
MongoClient.connect(mongoUrl, { useUnifiedTopology: true }, (err, client) => {
if (err) {
console.log(`Error connecting to MongoDB: ${err.message}`);
return;
}
db = client.db();
});
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
db.collection('logs').insertOne({message: data, timestamp: new Date()});
});
client.on('close', function close() {
console.log('Client disconnected from the server');
});
client.on('error', function error(err) {
console.log(`Client error: ${err.message}`);
db.collection('fails').insertOne({error: err.message, timestamp: new Date()});
});
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
await new Promise((resolve) => {
client.send(Body, undefined, resolve);
});
db.collection('logs').insertOne({message: Body, timestamp: new Date()});
await sqs.deleteMessageBatch(deleteParams).promise();
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
db.collection('fails').insertOne({error: err.message, timestamp: new Date()});
}
//Checking the number of bounced messages
try {
const queueAttributes = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessagesDelayed']
}).promise();
const numberOfBouncedMessages = queueAttributes.Attributes.ApproximateNumberOfMessagesDelayed;
if (numberOfBouncedMessages > 0) {
//resend the bounced message
const params = {
QueueUrl: queueUrl,
MessageBody: 'Bounced message body'
};
sqs.sendMessage(params, (err, data) => {
if (err) {
console.log(`Error resending bounced message: ${err.message}`);
db.collection('fails').insertOne({error: err.message, timestamp: new Date()});
} else {
console.log(`Bounced message resent successfully. Message ID: ${data.MessageId}`);
db.collection('logs').insertOne({messageId: data.MessageId, timestamp: new Date()});
}
});
}
} catch (err) {
console.log(`Error getting queue attributes: ${err.message}`);
db.collection('fails').insertOne({error: err.message, timestamp: new Date()});
}
};
processMessages();
const mongoose = require('mongoose');
const WebSocket = require('ws');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS({
region: 'us-west-2',
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'SECRET_KEY'
});
const queueUrl = 'https://sqs.us-west-2.amazonaws.com/1234567890/my-queue';
const client = new WebSocket('ws://localhost:8080');
const LogSchema = new mongoose.Schema({
message: { type: String },
timestamp: { type: Date, default: Date.now }
});
const Log = mongoose.model('Log', LogSchema);
const FailSchema = new mongoose.Schema({
error: { type: String },
timestamp: { type: Date, default: Date.now }
});
const Fail = mongoose.model('Fail', FailSchema);
mongoose.connect('mongodb://username:password@host:port/database', { useUnifiedTopology: true });
client.on('open', function open() {
console.log('Client connected to the server');
client.send('Hello World!');
});
client.on('message', function incoming(data) {
console.log(`Received message from the server: ${data}`);
const log = new Log({ message: data });
log.save();
});
client.on('close', function close() {
console.log('Client disconnected from the server');
});
client.on('error', function error(err) {
console.log(`Client error: ${err.message}`);
const fail = new Fail({ error: err.message });
fail.save();
});
const processMessages = async () => {
try {
const result = await sqs.receiveMessage({
QueueUrl: queueUrl,
WaitTimeSeconds: 20
}).promise();
if (result.Messages) {
const deleteParams = {
QueueUrl: queueUrl,
Entries: result.Messages.map(({ReceiptHandle, Id}) => ({ReceiptHandle, Id}))
};
for(const message of result.Messages){
const { Body } = message;
await new Promise((resolve) => {
client.send(Body, undefined, resolve);
});
const log = new Log({ message: Body });
log.save();
await sqs.deleteMessageBatch(deleteParams).promise();
}
}
setTimeout(processMessages, 0);
} catch (err) {
console.log(`Error processing SQS messages: ${err.message}`);
const fail = new Fail({ error: err.message });
fail.save();
}
//Checking the number of bounced messages
try {
const queueAttributes = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessagesDelayed']
}).promise();
const numberOfBouncedMessages = queueAttributes.Attributes.ApproximateNumberOfMessagesDelayed;
if (numberOfBouncedMessages > 0) {
//resend the bounced message
const params = {
QueueUrl: queueUrl,
MessageBody: 'Bounced message body'
};
sqs.sendMessage(params, (err, data) => {
if (err) {
console.log(`Error resending bounced message: ${err.message}`);
const fail = new Fail({ error: err.message });
fail.save();
} else {
console.log(`Bounced message resent successfully. Message ID: ${data.MessageId}`);
const log = new Log({ messageId: data.MessageId });
log.save();
}
});
}
} catch (err) {
console.log(`Error getting queue attributes: ${err.message}`);
const fail = new Fail({ error: err.message });
fail.save();
}
};
processMessages();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment