Created
December 3, 2024 15:12
-
-
Save brasizza/493977969dda7af93d2d8917487695e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/node | |
const crypto = require('crypto'); | |
const express = require('express'); | |
const { createClient } = require("redis"); | |
const { createAdapter } = require("@socket.io/redis-adapter"); | |
const cors = require('cors'); | |
const zlib = require('zlib'); | |
const { log } = require('console'); | |
const msgpack = require('msgpack5')(); | |
const app = express(); | |
const server = require('http').Server(app); | |
const LCERROR = '\x1b[31m%s\x1b[0m'; // red | |
const LCWARN = '\x1b[33m%s\x1b[0m'; // yellow | |
const LCINFO = '\x1b[36m%s\x1b[0m'; // cyan | |
const LCSUCCESS = '\x1b[32m%s\x1b[0m'; // green | |
const logger = class { | |
static error(message, ...optionalParams) { console.error(LCERROR, message, ...optionalParams) } | |
static log(message, ...optionalParams) { console.log(LCINFO, message, ...optionalParams) } | |
static warn(message, ...optionalParams) { console.warn(LCWARN, message, ...optionalParams) } | |
static info(message, ...optionalParams) { console.info(LCINFO, message, ...optionalParams) } | |
static success(message, ...optionalParams) { console.info(LCSUCCESS, message, ...optionalParams) } | |
}; | |
const SOCKET_PORT = process.env.SOCKET_PORT || 3000; | |
const USING_REDIS = process.env.USING_REDIS === 'true'; | |
const PATH_SOCKET = process.env.PATH_SOCKET; | |
const REDIS_HOST = process.env.REDIS_HOST; | |
const AUTH_PASSWORD = process.env.AUTH_PASSWORD; | |
const REDIS_PORT = process.env.REDIS_PORT; | |
const REDIS_USER = process.env.REDIS_USER; | |
const REDIS_PASS = process.env.REDIS_PASS; | |
const strPath = PATH_SOCKET ? `${PATH_SOCKET}/socket-io/socket.io` : '/socket-io/socket.io'; | |
const strRestPath = PATH_SOCKET ? `${PATH_SOCKET}/socket-io/notify` : '/socket-io/notify'; | |
const strRestPathRoot = PATH_SOCKET ? PATH_SOCKET + '/' : '/'; | |
const socketClients = {}; | |
const recentlyEmitted = new Set(); | |
let pubClient, subClient; | |
let io; // Declare io in the global scope | |
logger.info("strPath => " + strPath); | |
logger.info("strRestPath => " + strRestPath); | |
logger.info("strRestPathRoot => " + strRestPathRoot); | |
if (USING_REDIS) { | |
pubClient = createClient({ | |
socket: { | |
host: REDIS_HOST, | |
port: REDIS_PORT, | |
reconnectStrategy: (retries) => Math.min(retries * 100, 3000) | |
}, | |
username: REDIS_USER, | |
password: REDIS_PASS | |
}); | |
subClient = pubClient.duplicate(); | |
io = require('socket.io')(server, { | |
path: strPath, | |
perMessageDeflate: { threshold: 1 }, // Compress messages larger than 1KB | |
cors: { origin: "*", methods: ["GET", "POST"], credentials: true }, | |
pingInterval: 50000, // Adjusted ping interval | |
pingTimeout: 60000, // Adjusted timeout | |
}); | |
io.use((socket, next) => { | |
const password = socket.handshake.query.password; | |
// Check if the password is correct | |
if (password !== AUTH_PASSWORD) { | |
logger.error('Unauthorized connection attempt from', socket.client.id); | |
return next(new Error('Unauthorized')); // Reject the connection | |
} | |
// If password is correct, allow the connection | |
next(); | |
}); | |
(async () => { | |
try { | |
await pubClient.connect(); | |
await subClient.connect(); | |
logger.success("Connected to Redis"); | |
io.adapter(createAdapter(pubClient, subClient)); | |
io.on('connection', (socket) => { | |
logger.success(`${socket.client.id} connected`); | |
socketClients[socket.id] = socket; | |
socket.on('join', (room) => { | |
try { | |
socket.join(room); | |
logger.warn(`Socket ${socket.id} joined room: ${room}`); | |
} catch (error) { | |
logger.error('Error joining room:', error); | |
} | |
}); | |
socket.on('disconnect', () => { | |
logger.error(`${socket.client.id} disconnected`); | |
delete socketClients[socket.id]; | |
}); | |
socket.on('reconnect', () => logger.warn(`${socket.client.id} reconnecting`)); | |
}); | |
pubClient.on('error', (err) => logger.error('Redis Error:', err)); | |
pubClient.on('end', () => logger.warn('Redis connection closed')); | |
pubClient.on('close', () => logger.warn('Redis connection closed by server')); | |
} catch (err) { | |
logger.error("Error connecting to Redis:", err); | |
} | |
})(); | |
} else { | |
logger.error("No Redis server configured"); | |
} | |
// Emit batched messages | |
const messageQueue = {}; | |
const queueMessage = (channel, message) => { | |
if (!messageQueue[channel]) { | |
messageQueue[channel] = []; | |
} | |
messageQueue[channel].push(message); | |
}; | |
const emitBatchMessages = () => { | |
for (const [channel, messages] of Object.entries(messageQueue)) { | |
const roomGroups = {}; | |
// Group messages by their room | |
messages.forEach((message) => { | |
const room = message.room || "__broadcast__"; // Use a placeholder for broadcast messages | |
if (!roomGroups[room]) { | |
roomGroups[room] = []; | |
} | |
roomGroups[room].push(message); | |
}); | |
// Emit messages for each room or broadcast | |
for (const [room, groupedMessages] of Object.entries(roomGroups)) { | |
const roomGroupsJSON = JSON.stringify(roomGroups); | |
const compressionOptions = { level: 9 }; // Max compression level | |
zlib.gzip(roomGroupsJSON, compressionOptions, (err, compressedData) => { | |
if (err) { | |
console.error('Error compressing data:', err); | |
} else { | |
logger.info('Original size (KB):', (Buffer.byteLength(roomGroupsJSON) / 1024).toFixed(2)); | |
logger.info('Compressed size (KB):', (compressedData.length / 1024).toFixed(2)); | |
} | |
const batchedData = msgpack.encode(compressedData); | |
const batchedDataBlob = new Blob([batchedData]); | |
const batchedDataSize = batchedDataBlob.size / 1024; | |
logger.info(`Size of batchedData: ${batchedDataSize.toFixed(2)} KB`); | |
if (room === "__broadcast__") { | |
io.emit(channel, batchedData); // Broadcast to all clients | |
logger.success(`Broadcast batch emitted on channel: ${channel}`); | |
} else { | |
io.to(room).emit(channel, batchedData); // Emit to a specific room | |
logger.success(`Batch emitted to room: ${room} on channel: ${channel}`); | |
} | |
}); | |
} | |
// Clear the queue for the channel | |
delete messageQueue[channel]; | |
} | |
}; | |
setInterval(emitBatchMessages, 1000); // Batch every second | |
const checkAndEmitMessage = async (channel, data) => { | |
try { | |
const messageId = generateHash(JSON.stringify(data)); | |
if (recentlyEmitted.has(messageId)) { | |
logger.error(`Message recently emitted: ${messageId} ${data['carga'] ?? data['metodo']}`); | |
return; | |
} | |
const isDuplicate = await pubClient.get(`msg:${messageId}`); | |
if (isDuplicate) { | |
logger.error(`Duplicate message detected: ${messageId} ${data['carga'] ?? data['metodo']}`); | |
return; | |
} | |
await pubClient.set(`msg:${messageId}`, 'true', { EX: 5 }); | |
recentlyEmitted.add(messageId); | |
logger.success(`Sent to emit: ${messageId} => ${data['carga'] ?? data['metodo']}`); | |
setTimeout(() => recentlyEmitted.delete(messageId), 5000); | |
queueMessage(channel, data); | |
} catch (e) { | |
logger.error("Error handling message:", e); | |
} | |
}; | |
const generateHash = (data) => crypto.createHash('sha256').update(data).digest('hex'); | |
console.log("RUNNING ON PORT " + SOCKET_PORT); | |
app.use(cors()); | |
app.options('*', cors({ credentials: true, origin: true })); | |
server.listen(SOCKET_PORT, '0.0.0.0'); | |
app.use(express.json()); | |
app.use(express.urlencoded({ extended: true })); | |
app.get(strRestPathRoot, (_, res) => res.status(200).send("OK")); | |
app.post(strRestPath, (req, res) => { | |
if (!Object.keys(req.body).length) { | |
logger.error('Missing object'); | |
res.status(404).send(""); | |
return; | |
} | |
checkAndEmitMessage(req.body.metodo, req.body); | |
res.status(200).send(""); | |
}); | |
setInterval(() => { | |
const totalUsers = Object.keys(socketClients).length; | |
logger.info(`Total connected users: ${totalUsers}`); | |
}, 30000); | |
process.on('SIGINT', async () => { | |
logger.warn('Shutting down gracefully...'); | |
if (pubClient) await pubClient.quit(); | |
process.exit(0); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment