Skip to content

Instantly share code, notes, and snippets.

@neodigm
Created September 17, 2025 13:32
Show Gist options
  • Save neodigm/434c60f21403dc5c1b8514d7acf82ec2 to your computer and use it in GitHub Desktop.
Save neodigm/434c60f21403dc5c1b8514d7acf82ec2 to your computer and use it in GitHub Desktop.
SSE Server end-point logic
let arySSEConnections = []
app.get('/mvv/v5/sse', async (req, res) => { // Contract /mvv/v5/sse?appid=APPID&[email protected]&lastkey=123 || /mvv/v5/sse?action=RESET || https://a55-wtt-api-v1.onrender.com/mvv/v5/sse?action=RESET
// Stream Sync Engage
console.log(" ~... SSE req?.query | " , req?.query )
// AVA Check URL Against Whitelist
let queryAPPID = req.query?.appid || null // Unique by App, token
let queryACTION = req.query?.action || null // Command
let querySSEID = req.query?.sseid || uuidv4() // Unique by Client, may be email (else guid)
let queryLASTKEY = req.query?.lastkey || 0 // Last Msg Key Received
if( queryAPPID ) queryAPPID = queryAPPID.toUpperCase()
if( querySSEID ) querySSEID = querySSEID.toLowerCase()
const config = {
"GENAISYS": {"SSE_INTERVAL": 20000, "MAX_NO_MESSAGE": 10}, // Sixty Seconds (60000) | Two days 2880
"SYPHER": {"SSE_INTERVAL": 20000, "MAX_NO_MESSAGE": 2880},
}
let sSQL = ""
let pgClient = null
let currentID = null
let nIntv = null
let isHeaderSent = false // Nevermore
let resWritePackage = "" // Final Package Streamed to Client
const fconnFilt = ( ary )=>{ // Remove Previous Instances of this APPID
return ary.filter( ( sesh )=>{
if( sesh?.appid == queryAPPID ){
if( sesh?.nintv ) clearInterval( sesh?.nintv )
return false;
}else{ return true; }
} )
}
if( queryACTION == "RESET"){
const nLen = arySSEConnections.length
arySSEConnections.forEach( ( sesh )=>{ clearInterval( sesh?.nintv ) } )
arySSEConnections = []
return res.status( 200 ).json({ "action": "reset", "length": nLen })
}else{
if( queryAPPID && querySSEID && queryLASTKEY ){
arySSEConnections = fconnFilt(arySSEConnections)
res.on("close", ()=>{
console.log(" ~... SSE on close len 1 | " , arySSEConnections.length )
arySSEConnections = fconnFilt(arySSEConnections)
console.log(" ~... SSE on close len 2 | " , arySSEConnections.length )
});
nIntv = setInterval( async ()=>{
resWritePackage = "" // The Final Ouput
console.log(" ~... SSE interval queryAPPID querySSEID queryLASTKEY | " , queryAPPID + " | " + querySSEID + " | " + queryLASTKEY )
pgClient = new pg.Client( OSTATE.pstg )
await pgClient.connect()
try { // AVA Filter by TS window columns
sSQL = `SELECT *
FROM "${ OSTATE.schm }"."mvvStreamSyncEngage"
WHERE "status" = 'NEW'
AND "app"->>'app' = '${ queryAPPID }'
AND NOT EXISTS (
SELECT 1
FROM jsonb_array_elements("ips") AS ip_obj
WHERE ip_obj->>'ip' = '${ querySSEID }'
)
ORDER BY "create_time" ASC
LIMIT 1;` // Soft Delete - get oldest row
const dbRes = await pgClient.query( sSQL )
if( !isHeaderSent ){ // Run Once Nevermore
res.writeHead(200, { // Set SSE headers
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
})
isHeaderSent = true
}
if( dbRes?.rows.length ){
let r = dbRes.rows[0]
currentID = r?.id
if( r?.class?.class && r?.msg?.msg ){
let dto = `data: { "id": "${ currentID }", "class": "${ r.class.class }", "msg":"${ r.msg.msg }", "criterion":"${ r.criteria.criterion }"}\n\n`
console.log( " ~... SSE Stream Sync Engage push package | " + dto ) // AVA Reset the nomessage counter to 0
resWritePackage = dto
}else{
console.log(" ~... SSE missing push columns")
}
}else{
console.log(" ~... SSE no rows meet criteria")
arySSEConnections.forEach( ( inst )=>{
if( inst?.appid == queryAPPID ) inst.nomessage = inst.nomessage + 1;
} )
const countNoMessage = arySSEConnections.filter( ( inst )=>{ return ( inst.appid == queryAPPID ) } ) // AVA MAX_NO_MESSAGE
resWritePackage = `data: { "class": "NO_MESSAGE", "msg": "${ countNoMessage[ 0 ]?.nomessage || 1 }" }\n\n`
}
} catch (err) {
console.error(" ~... SSE select or push failed " + err )
resWritePackage = `data: { "class": "RECONNECT" }\n\n`
} finally {
await pgClient.end()
try { // Update the Row with an ip property Appended to the ips JSON
if( currentID ){
pgClient = new pg.Client( OSTATE.pstg )
await pgClient.connect()
sSQL = `UPDATE "${ OSTATE.schm }"."mvvStreamSyncEngage"
SET "ips" = "ips" || '[{"ip":"${ querySSEID }"}]'::jsonb,
"update_time" = now(),
"modified_by" = '1'
WHERE "id" = ${ currentID };`
await pgClient.query( sSQL )
}
} catch (err) {
console.error(" ~... SSE update ips failed " + err )
} finally {
if( currentID ) await pgClient.end()
if( resWritePackage ){
console.warn(" ~... SSE Stream Sync Engage resWritePackage | " + resWritePackage )
res.write( resWritePackage ) // One Exit
}
console.warn(" ~... SSE Stream Sync Engage arySSEConnections.length | " + arySSEConnections.length )
}
}
}, config[ queryAPPID ]?.SSE_INTERVAL )
arySSEConnections.push( {"appid": queryAPPID, "nintv": nIntv, "nomessage": 0 } )
}
}
});
app.get('/mvv/v5/sse/messages', async (req, res) => { // GET /mvv/v5/sse/messages - List all messages | Return array of messages
console.log(" ~... SSE GET /mvv/v5/sse/messages req?.query | " , req?.query )
let sSQL = ""
let pgClient = new pg.Client( OSTATE.pstg )
await pgClient.connect()
sSQL = `SELECT *
FROM "${ OSTATE.schm }"."mvvStreamSyncEngage"
WHERE "status" = 'NEW'
ORDER BY "create_time" DESC;`
const dbRes = await pgClient.query( sSQL )
await pgClient.end()
if( dbRes?.rows.length ){
return res.status( 200 ).json( { "rows": dbRes.rows } )
}else{
return res.status( 400 ).json( { "msg": "no rows"} )
}
});
app.get('/mvv/v5/sse/messages/:id', async (req, res) => { // GET /mvv/v5/sse/messages/:id - Get specific message by ID | Return single message
console.log(" ~... SSE GET /mvv/v5/sse/messages:id req?.query req.params.id | " , req?.query, req.params.id )
let sSQL = ""
let pgClient = new pg.Client( OSTATE.pstg )
await pgClient.connect()
sSQL = `SELECT *
FROM "${ OSTATE.schm }"."mvvStreamSyncEngage"
WHERE "status" = 'NEW' AND "id" = '${ req.params.id }';`
const dbRes = await pgClient.query( sSQL )
await pgClient.end()
if( dbRes?.rows.length ){
return res.status( 200 ).json( { "rows": dbRes.rows } )
}else{
return res.status( 400 ).json( { "msg": "no rows"} )
}
});
app.post('/mvv/v5/sse/messages', async (req, res) => { // POST /mvv/v5/sse/messages - Create new message | Create and return new message
console.log(" ~... SSE POST /mvv/v5/sse/messages:id req?.query req.body | " , req?.query, req.body )
let sSQL = ""
let pgClient = new pg.Client( OSTATE.pstg )
await pgClient.connect()
req.body[ "guid" ] = AgentUtils.genHash( uuidv4() )
// "window_start_time", "window_end_time", "create_by", "modified_by", "app", "criteria", "msg", "ips", "tags", "comments", "webhook", "class", "status", "guid"
sSQL = `INSERT INTO "` + OSTATE.schm + `"."mvvStreamSyncEngage" ("window_start_time", "window_end_time", "create_by", "modified_by", "app", "criteria", "msg", "ips", "tags", "comments", "webhook", "class", "status", "guid") VALUES `
sSQL += `(`;
["window_start_time", "window_end_time", "create_by", "modified_by", "app", "criteria", "msg", "ips", "tags", "comments", "webhook", "class", "status", "guid"].forEach( ( sCol )=>{
let colVal = req.body[ sCol ]
if( typeof colVal == "object" ) colVal = JSON.stringify( colVal )
sSQL += `'` + colVal+ `',`; // AVA SQL cleanes
} )
sSQL = sSQL.slice(0, -1) // Remove last comma
sSQL += `) RETURNING id`;
const dbRes = await pgClient.query( sSQL )
const newId = dbRes.rows[0].id
await pgClient.end()
return res.status( 200 ).json( { "id": newId, "guid": req.body[ "guid" ] } )
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment