-
-
Save bjarneo/3a7337168632f395273f0275605a0160 to your computer and use it in GitHub Desktop.
pgmqdemo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pkg from 'pg'; | |
const { Pool, Client } = pkg; | |
const DATABASE_NAME = 'queue_demo'; | |
const QUEUE_NAME = 'my_simple_queue'; | |
const DEFAULT_DB = 'postgres'; | |
const dbConfig = { | |
user: 'postgres', | |
host: 'localhost', | |
password: 'postgres', | |
port: 5432, | |
}; | |
// Global pool, initialized after DB check | |
let pool; | |
async function ensureDatabaseExists() { | |
const client = new Client({ ...dbConfig, database: DEFAULT_DB }); | |
try { | |
await client.connect(); | |
const checkRes = await client.query('SELECT 1 FROM pg_database WHERE datname = $1', [DATABASE_NAME]); | |
if (checkRes.rowCount === 0) { | |
console.log(`Database "${DATABASE_NAME}" not found, creating...`); | |
await client.query(`CREATE DATABASE "${DATABASE_NAME}"`); | |
console.log(`Database "${DATABASE_NAME}" created.`); | |
} else { | |
// console.log(`Database "${DATABASE_NAME}" already exists.`); // Optional: keep if needed | |
} | |
} catch (error) { | |
console.error(`Database check/creation failed for "${DATABASE_NAME}":`, error); | |
throw error; | |
} finally { | |
await client.end(); | |
} | |
} | |
async function runDemo() { | |
const client = await pool.connect(); | |
console.log(`Connected to ${DATABASE_NAME}, running demo...`); | |
try { | |
// Ensure pgmq extension and queue | |
await client.query('CREATE EXTENSION IF NOT EXISTS pgmq;'); | |
await client.query('SELECT pgmq.create($1);', [QUEUE_NAME]); | |
console.log(`pgmq extension and queue "${QUEUE_NAME}" ensured.`); | |
// Send message | |
const messagePayload = { type: 'update', id: Date.now() }; | |
const { rows: sendRows } = await client.query( | |
'SELECT pgmq.send($1, $2::jsonb) AS msg_id;', | |
[QUEUE_NAME, messagePayload] | |
); | |
const sentMsgId = sendRows[0].msg_id; | |
console.log(`Sent msg ${sentMsgId}:`, messagePayload); | |
// Read message (15s visibility) | |
const { rows: readRows } = await client.query( | |
'SELECT msg_id, message FROM pgmq.read($1, 15, 1);', | |
[QUEUE_NAME] | |
); | |
if (readRows.length > 0) { | |
const { msg_id: receivedMsgId, message: receivedPayload } = readRows[0]; | |
console.log(`Read msg ${receivedMsgId}:`, receivedPayload); | |
// Simulate processing | |
console.log('Processing...'); | |
await new Promise(resolve => setTimeout(resolve, 200)); // Shorter delay | |
// Delete message | |
await client.query('SELECT pgmq.delete($1, $2::bigint);', [QUEUE_NAME, receivedMsgId]); | |
console.log(`Deleted msg ${receivedMsgId}.`); | |
} else { | |
console.log('No messages found.'); | |
} | |
} catch (error) { | |
console.error('Error during pgmq demo:', error); | |
} finally { | |
if (client) client.release(); | |
console.log('Client released.'); | |
} | |
} | |
async function main() { | |
try { | |
await ensureDatabaseExists(); | |
// Initialize the main pool now that DB exists | |
pool = new Pool({ | |
...dbConfig, | |
database: DATABASE_NAME, | |
}); | |
// Handle pool errors | |
pool.on('error', (err) => { | |
console.error('Unexpected error on idle client', err); | |
process.exit(-1); | |
}); | |
await runDemo(); | |
} catch (error) { | |
// Error during DB setup is already logged | |
process.exit(1); | |
} finally { | |
// Close the pool after the demo is done (or if setup failed before pool init) | |
if (pool) { | |
await pool.end(); | |
console.log('Pool closed.'); | |
} | |
} | |
} | |
main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment