Skip to content

Instantly share code, notes, and snippets.

@brasizza
Created December 3, 2024 15:12
Show Gist options
  • Save brasizza/493977969dda7af93d2d8917487695e5 to your computer and use it in GitHub Desktop.
Save brasizza/493977969dda7af93d2d8917487695e5 to your computer and use it in GitHub Desktop.
#!/usr/bin/node
const crypto = require('crypto');
const express = require('express');
const { createClient } = require("redis");
const { createAdapter } = require("@socket.io/redis-adapter");
const cors = require('cors');
const zlib = require('zlib');
const { log } = require('console');
const msgpack = require('msgpack5')();
const app = express();
const server = require('http').Server(app);
const LCERROR = '\x1b[31m%s\x1b[0m'; // red
const LCWARN = '\x1b[33m%s\x1b[0m'; // yellow
const LCINFO = '\x1b[36m%s\x1b[0m'; // cyan
const LCSUCCESS = '\x1b[32m%s\x1b[0m'; // green
const logger = class {
static error(message, ...optionalParams) { console.error(LCERROR, message, ...optionalParams) }
static log(message, ...optionalParams) { console.log(LCINFO, message, ...optionalParams) }
static warn(message, ...optionalParams) { console.warn(LCWARN, message, ...optionalParams) }
static info(message, ...optionalParams) { console.info(LCINFO, message, ...optionalParams) }
static success(message, ...optionalParams) { console.info(LCSUCCESS, message, ...optionalParams) }
};
const SOCKET_PORT = process.env.SOCKET_PORT || 3000;
const USING_REDIS = process.env.USING_REDIS === 'true';
const PATH_SOCKET = process.env.PATH_SOCKET;
const REDIS_HOST = process.env.REDIS_HOST;
const AUTH_PASSWORD = process.env.AUTH_PASSWORD;
const REDIS_PORT = process.env.REDIS_PORT;
const REDIS_USER = process.env.REDIS_USER;
const REDIS_PASS = process.env.REDIS_PASS;
const strPath = PATH_SOCKET ? `${PATH_SOCKET}/socket-io/socket.io` : '/socket-io/socket.io';
const strRestPath = PATH_SOCKET ? `${PATH_SOCKET}/socket-io/notify` : '/socket-io/notify';
const strRestPathRoot = PATH_SOCKET ? PATH_SOCKET + '/' : '/';
const socketClients = {};
const recentlyEmitted = new Set();
let pubClient, subClient;
let io; // Declare io in the global scope
logger.info("strPath => " + strPath);
logger.info("strRestPath => " + strRestPath);
logger.info("strRestPathRoot => " + strRestPathRoot);
if (USING_REDIS) {
pubClient = createClient({
socket: {
host: REDIS_HOST,
port: REDIS_PORT,
reconnectStrategy: (retries) => Math.min(retries * 100, 3000)
},
username: REDIS_USER,
password: REDIS_PASS
});
subClient = pubClient.duplicate();
io = require('socket.io')(server, {
path: strPath,
perMessageDeflate: { threshold: 1 }, // Compress messages larger than 1KB
cors: { origin: "*", methods: ["GET", "POST"], credentials: true },
pingInterval: 50000, // Adjusted ping interval
pingTimeout: 60000, // Adjusted timeout
});
io.use((socket, next) => {
const password = socket.handshake.query.password;
// Check if the password is correct
if (password !== AUTH_PASSWORD) {
logger.error('Unauthorized connection attempt from', socket.client.id);
return next(new Error('Unauthorized')); // Reject the connection
}
// If password is correct, allow the connection
next();
});
(async () => {
try {
await pubClient.connect();
await subClient.connect();
logger.success("Connected to Redis");
io.adapter(createAdapter(pubClient, subClient));
io.on('connection', (socket) => {
logger.success(`${socket.client.id} connected`);
socketClients[socket.id] = socket;
socket.on('join', (room) => {
try {
socket.join(room);
logger.warn(`Socket ${socket.id} joined room: ${room}`);
} catch (error) {
logger.error('Error joining room:', error);
}
});
socket.on('disconnect', () => {
logger.error(`${socket.client.id} disconnected`);
delete socketClients[socket.id];
});
socket.on('reconnect', () => logger.warn(`${socket.client.id} reconnecting`));
});
pubClient.on('error', (err) => logger.error('Redis Error:', err));
pubClient.on('end', () => logger.warn('Redis connection closed'));
pubClient.on('close', () => logger.warn('Redis connection closed by server'));
} catch (err) {
logger.error("Error connecting to Redis:", err);
}
})();
} else {
logger.error("No Redis server configured");
}
// Emit batched messages
const messageQueue = {};
const queueMessage = (channel, message) => {
if (!messageQueue[channel]) {
messageQueue[channel] = [];
}
messageQueue[channel].push(message);
};
const emitBatchMessages = () => {
for (const [channel, messages] of Object.entries(messageQueue)) {
const roomGroups = {};
// Group messages by their room
messages.forEach((message) => {
const room = message.room || "__broadcast__"; // Use a placeholder for broadcast messages
if (!roomGroups[room]) {
roomGroups[room] = [];
}
roomGroups[room].push(message);
});
// Emit messages for each room or broadcast
for (const [room, groupedMessages] of Object.entries(roomGroups)) {
const roomGroupsJSON = JSON.stringify(roomGroups);
const compressionOptions = { level: 9 }; // Max compression level
zlib.gzip(roomGroupsJSON, compressionOptions, (err, compressedData) => {
if (err) {
console.error('Error compressing data:', err);
} else {
logger.info('Original size (KB):', (Buffer.byteLength(roomGroupsJSON) / 1024).toFixed(2));
logger.info('Compressed size (KB):', (compressedData.length / 1024).toFixed(2));
}
const batchedData = msgpack.encode(compressedData);
const batchedDataBlob = new Blob([batchedData]);
const batchedDataSize = batchedDataBlob.size / 1024;
logger.info(`Size of batchedData: ${batchedDataSize.toFixed(2)} KB`);
if (room === "__broadcast__") {
io.emit(channel, batchedData); // Broadcast to all clients
logger.success(`Broadcast batch emitted on channel: ${channel}`);
} else {
io.to(room).emit(channel, batchedData); // Emit to a specific room
logger.success(`Batch emitted to room: ${room} on channel: ${channel}`);
}
});
}
// Clear the queue for the channel
delete messageQueue[channel];
}
};
setInterval(emitBatchMessages, 1000); // Batch every second
const checkAndEmitMessage = async (channel, data) => {
try {
const messageId = generateHash(JSON.stringify(data));
if (recentlyEmitted.has(messageId)) {
logger.error(`Message recently emitted: ${messageId} ${data['carga'] ?? data['metodo']}`);
return;
}
const isDuplicate = await pubClient.get(`msg:${messageId}`);
if (isDuplicate) {
logger.error(`Duplicate message detected: ${messageId} ${data['carga'] ?? data['metodo']}`);
return;
}
await pubClient.set(`msg:${messageId}`, 'true', { EX: 5 });
recentlyEmitted.add(messageId);
logger.success(`Sent to emit: ${messageId} => ${data['carga'] ?? data['metodo']}`);
setTimeout(() => recentlyEmitted.delete(messageId), 5000);
queueMessage(channel, data);
} catch (e) {
logger.error("Error handling message:", e);
}
};
const generateHash = (data) => crypto.createHash('sha256').update(data).digest('hex');
console.log("RUNNING ON PORT " + SOCKET_PORT);
app.use(cors());
app.options('*', cors({ credentials: true, origin: true }));
server.listen(SOCKET_PORT, '0.0.0.0');
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.get(strRestPathRoot, (_, res) => res.status(200).send("OK"));
app.post(strRestPath, (req, res) => {
if (!Object.keys(req.body).length) {
logger.error('Missing object');
res.status(404).send("");
return;
}
checkAndEmitMessage(req.body.metodo, req.body);
res.status(200).send("");
});
setInterval(() => {
const totalUsers = Object.keys(socketClients).length;
logger.info(`Total connected users: ${totalUsers}`);
}, 30000);
process.on('SIGINT', async () => {
logger.warn('Shutting down gracefully...');
if (pubClient) await pubClient.quit();
process.exit(0);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment