Skip to content

Instantly share code, notes, and snippets.

@bjarneo
Created May 3, 2025 17:24
Show Gist options
  • Save bjarneo/3a7337168632f395273f0275605a0160 to your computer and use it in GitHub Desktop.
Save bjarneo/3a7337168632f395273f0275605a0160 to your computer and use it in GitHub Desktop.
pgmqdemo
import pkg from 'pg';
const { Pool, Client } = pkg;
const DATABASE_NAME = 'queue_demo';
const QUEUE_NAME = 'my_simple_queue';
const DEFAULT_DB = 'postgres';
const dbConfig = {
user: 'postgres',
host: 'localhost',
password: 'postgres',
port: 5432,
};
// Global pool, initialized after DB check
let pool;
async function ensureDatabaseExists() {
const client = new Client({ ...dbConfig, database: DEFAULT_DB });
try {
await client.connect();
const checkRes = await client.query('SELECT 1 FROM pg_database WHERE datname = $1', [DATABASE_NAME]);
if (checkRes.rowCount === 0) {
console.log(`Database "${DATABASE_NAME}" not found, creating...`);
await client.query(`CREATE DATABASE "${DATABASE_NAME}"`);
console.log(`Database "${DATABASE_NAME}" created.`);
} else {
// console.log(`Database "${DATABASE_NAME}" already exists.`); // Optional: keep if needed
}
} catch (error) {
console.error(`Database check/creation failed for "${DATABASE_NAME}":`, error);
throw error;
} finally {
await client.end();
}
}
async function runDemo() {
const client = await pool.connect();
console.log(`Connected to ${DATABASE_NAME}, running demo...`);
try {
// Ensure pgmq extension and queue
await client.query('CREATE EXTENSION IF NOT EXISTS pgmq;');
await client.query('SELECT pgmq.create($1);', [QUEUE_NAME]);
console.log(`pgmq extension and queue "${QUEUE_NAME}" ensured.`);
// Send message
const messagePayload = { type: 'update', id: Date.now() };
const { rows: sendRows } = await client.query(
'SELECT pgmq.send($1, $2::jsonb) AS msg_id;',
[QUEUE_NAME, messagePayload]
);
const sentMsgId = sendRows[0].msg_id;
console.log(`Sent msg ${sentMsgId}:`, messagePayload);
// Read message (15s visibility)
const { rows: readRows } = await client.query(
'SELECT msg_id, message FROM pgmq.read($1, 15, 1);',
[QUEUE_NAME]
);
if (readRows.length > 0) {
const { msg_id: receivedMsgId, message: receivedPayload } = readRows[0];
console.log(`Read msg ${receivedMsgId}:`, receivedPayload);
// Simulate processing
console.log('Processing...');
await new Promise(resolve => setTimeout(resolve, 200)); // Shorter delay
// Delete message
await client.query('SELECT pgmq.delete($1, $2::bigint);', [QUEUE_NAME, receivedMsgId]);
console.log(`Deleted msg ${receivedMsgId}.`);
} else {
console.log('No messages found.');
}
} catch (error) {
console.error('Error during pgmq demo:', error);
} finally {
if (client) client.release();
console.log('Client released.');
}
}
async function main() {
try {
await ensureDatabaseExists();
// Initialize the main pool now that DB exists
pool = new Pool({
...dbConfig,
database: DATABASE_NAME,
});
// Handle pool errors
pool.on('error', (err) => {
console.error('Unexpected error on idle client', err);
process.exit(-1);
});
await runDemo();
} catch (error) {
// Error during DB setup is already logged
process.exit(1);
} finally {
// Close the pool after the demo is done (or if setup failed before pool init)
if (pool) {
await pool.end();
console.log('Pool closed.');
}
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment