Created
September 17, 2025 13:32
-
-
Save neodigm/434c60f21403dc5c1b8514d7acf82ec2 to your computer and use it in GitHub Desktop.
SSE Server end-point logic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let arySSEConnections = [] | |
app.get('/mvv/v5/sse', async (req, res) => { // Contract /mvv/v5/sse?appid=APPID&[email protected]&lastkey=123 || /mvv/v5/sse?action=RESET || https://a55-wtt-api-v1.onrender.com/mvv/v5/sse?action=RESET | |
// Stream Sync Engage | |
console.log(" ~... SSE req?.query | " , req?.query ) | |
// AVA Check URL Against Whitelist | |
let queryAPPID = req.query?.appid || null // Unique by App, token | |
let queryACTION = req.query?.action || null // Command | |
let querySSEID = req.query?.sseid || uuidv4() // Unique by Client, may be email (else guid) | |
let queryLASTKEY = req.query?.lastkey || 0 // Last Msg Key Received | |
if( queryAPPID ) queryAPPID = queryAPPID.toUpperCase() | |
if( querySSEID ) querySSEID = querySSEID.toLowerCase() | |
const config = { | |
"GENAISYS": {"SSE_INTERVAL": 20000, "MAX_NO_MESSAGE": 10}, // Sixty Seconds (60000) | Two days 2880 | |
"SYPHER": {"SSE_INTERVAL": 20000, "MAX_NO_MESSAGE": 2880}, | |
} | |
let sSQL = "" | |
let pgClient = null | |
let currentID = null | |
let nIntv = null | |
let isHeaderSent = false // Nevermore | |
let resWritePackage = "" // Final Package Streamed to Client | |
const fconnFilt = ( ary )=>{ // Remove Previous Instances of this APPID | |
return ary.filter( ( sesh )=>{ | |
if( sesh?.appid == queryAPPID ){ | |
if( sesh?.nintv ) clearInterval( sesh?.nintv ) | |
return false; | |
}else{ return true; } | |
} ) | |
} | |
if( queryACTION == "RESET"){ | |
const nLen = arySSEConnections.length | |
arySSEConnections.forEach( ( sesh )=>{ clearInterval( sesh?.nintv ) } ) | |
arySSEConnections = [] | |
return res.status( 200 ).json({ "action": "reset", "length": nLen }) | |
}else{ | |
if( queryAPPID && querySSEID && queryLASTKEY ){ | |
arySSEConnections = fconnFilt(arySSEConnections) | |
res.on("close", ()=>{ | |
console.log(" ~... SSE on close len 1 | " , arySSEConnections.length ) | |
arySSEConnections = fconnFilt(arySSEConnections) | |
console.log(" ~... SSE on close len 2 | " , arySSEConnections.length ) | |
}); | |
nIntv = setInterval( async ()=>{ | |
resWritePackage = "" // The Final Ouput | |
console.log(" ~... SSE interval queryAPPID querySSEID queryLASTKEY | " , queryAPPID + " | " + querySSEID + " | " + queryLASTKEY ) | |
pgClient = new pg.Client( OSTATE.pstg ) | |
await pgClient.connect() | |
try { // AVA Filter by TS window columns | |
sSQL = `SELECT * | |
FROM "${ OSTATE.schm }"."mvvStreamSyncEngage" | |
WHERE "status" = 'NEW' | |
AND "app"->>'app' = '${ queryAPPID }' | |
AND NOT EXISTS ( | |
SELECT 1 | |
FROM jsonb_array_elements("ips") AS ip_obj | |
WHERE ip_obj->>'ip' = '${ querySSEID }' | |
) | |
ORDER BY "create_time" ASC | |
LIMIT 1;` // Soft Delete - get oldest row | |
const dbRes = await pgClient.query( sSQL ) | |
if( !isHeaderSent ){ // Run Once Nevermore | |
res.writeHead(200, { // Set SSE headers | |
'Content-Type': 'text/event-stream', | |
'Cache-Control': 'no-cache', | |
'Connection': 'keep-alive', | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Allow-Headers': 'Cache-Control' | |
}) | |
isHeaderSent = true | |
} | |
if( dbRes?.rows.length ){ | |
let r = dbRes.rows[0] | |
currentID = r?.id | |
if( r?.class?.class && r?.msg?.msg ){ | |
let dto = `data: { "id": "${ currentID }", "class": "${ r.class.class }", "msg":"${ r.msg.msg }", "criterion":"${ r.criteria.criterion }"}\n\n` | |
console.log( " ~... SSE Stream Sync Engage push package | " + dto ) // AVA Reset the nomessage counter to 0 | |
resWritePackage = dto | |
}else{ | |
console.log(" ~... SSE missing push columns") | |
} | |
}else{ | |
console.log(" ~... SSE no rows meet criteria") | |
arySSEConnections.forEach( ( inst )=>{ | |
if( inst?.appid == queryAPPID ) inst.nomessage = inst.nomessage + 1; | |
} ) | |
const countNoMessage = arySSEConnections.filter( ( inst )=>{ return ( inst.appid == queryAPPID ) } ) // AVA MAX_NO_MESSAGE | |
resWritePackage = `data: { "class": "NO_MESSAGE", "msg": "${ countNoMessage[ 0 ]?.nomessage || 1 }" }\n\n` | |
} | |
} catch (err) { | |
console.error(" ~... SSE select or push failed " + err ) | |
resWritePackage = `data: { "class": "RECONNECT" }\n\n` | |
} finally { | |
await pgClient.end() | |
try { // Update the Row with an ip property Appended to the ips JSON | |
if( currentID ){ | |
pgClient = new pg.Client( OSTATE.pstg ) | |
await pgClient.connect() | |
sSQL = `UPDATE "${ OSTATE.schm }"."mvvStreamSyncEngage" | |
SET "ips" = "ips" || '[{"ip":"${ querySSEID }"}]'::jsonb, | |
"update_time" = now(), | |
"modified_by" = '1' | |
WHERE "id" = ${ currentID };` | |
await pgClient.query( sSQL ) | |
} | |
} catch (err) { | |
console.error(" ~... SSE update ips failed " + err ) | |
} finally { | |
if( currentID ) await pgClient.end() | |
if( resWritePackage ){ | |
console.warn(" ~... SSE Stream Sync Engage resWritePackage | " + resWritePackage ) | |
res.write( resWritePackage ) // One Exit | |
} | |
console.warn(" ~... SSE Stream Sync Engage arySSEConnections.length | " + arySSEConnections.length ) | |
} | |
} | |
}, config[ queryAPPID ]?.SSE_INTERVAL ) | |
arySSEConnections.push( {"appid": queryAPPID, "nintv": nIntv, "nomessage": 0 } ) | |
} | |
} | |
}); | |
app.get('/mvv/v5/sse/messages', async (req, res) => { // GET /mvv/v5/sse/messages - List all messages | Return array of messages | |
console.log(" ~... SSE GET /mvv/v5/sse/messages req?.query | " , req?.query ) | |
let sSQL = "" | |
let pgClient = new pg.Client( OSTATE.pstg ) | |
await pgClient.connect() | |
sSQL = `SELECT * | |
FROM "${ OSTATE.schm }"."mvvStreamSyncEngage" | |
WHERE "status" = 'NEW' | |
ORDER BY "create_time" DESC;` | |
const dbRes = await pgClient.query( sSQL ) | |
await pgClient.end() | |
if( dbRes?.rows.length ){ | |
return res.status( 200 ).json( { "rows": dbRes.rows } ) | |
}else{ | |
return res.status( 400 ).json( { "msg": "no rows"} ) | |
} | |
}); | |
app.get('/mvv/v5/sse/messages/:id', async (req, res) => { // GET /mvv/v5/sse/messages/:id - Get specific message by ID | Return single message | |
console.log(" ~... SSE GET /mvv/v5/sse/messages:id req?.query req.params.id | " , req?.query, req.params.id ) | |
let sSQL = "" | |
let pgClient = new pg.Client( OSTATE.pstg ) | |
await pgClient.connect() | |
sSQL = `SELECT * | |
FROM "${ OSTATE.schm }"."mvvStreamSyncEngage" | |
WHERE "status" = 'NEW' AND "id" = '${ req.params.id }';` | |
const dbRes = await pgClient.query( sSQL ) | |
await pgClient.end() | |
if( dbRes?.rows.length ){ | |
return res.status( 200 ).json( { "rows": dbRes.rows } ) | |
}else{ | |
return res.status( 400 ).json( { "msg": "no rows"} ) | |
} | |
}); | |
app.post('/mvv/v5/sse/messages', async (req, res) => { // POST /mvv/v5/sse/messages - Create new message | Create and return new message | |
console.log(" ~... SSE POST /mvv/v5/sse/messages:id req?.query req.body | " , req?.query, req.body ) | |
let sSQL = "" | |
let pgClient = new pg.Client( OSTATE.pstg ) | |
await pgClient.connect() | |
req.body[ "guid" ] = AgentUtils.genHash( uuidv4() ) | |
// "window_start_time", "window_end_time", "create_by", "modified_by", "app", "criteria", "msg", "ips", "tags", "comments", "webhook", "class", "status", "guid" | |
sSQL = `INSERT INTO "` + OSTATE.schm + `"."mvvStreamSyncEngage" ("window_start_time", "window_end_time", "create_by", "modified_by", "app", "criteria", "msg", "ips", "tags", "comments", "webhook", "class", "status", "guid") VALUES ` | |
sSQL += `(`; | |
["window_start_time", "window_end_time", "create_by", "modified_by", "app", "criteria", "msg", "ips", "tags", "comments", "webhook", "class", "status", "guid"].forEach( ( sCol )=>{ | |
let colVal = req.body[ sCol ] | |
if( typeof colVal == "object" ) colVal = JSON.stringify( colVal ) | |
sSQL += `'` + colVal+ `',`; // AVA SQL cleanes | |
} ) | |
sSQL = sSQL.slice(0, -1) // Remove last comma | |
sSQL += `) RETURNING id`; | |
const dbRes = await pgClient.query( sSQL ) | |
const newId = dbRes.rows[0].id | |
await pgClient.end() | |
return res.status( 200 ).json( { "id": newId, "guid": req.body[ "guid" ] } ) | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment