Skip to content

Instantly share code, notes, and snippets.

@jakejscott
Last active August 29, 2015 14:18
Show Gist options
  • Save jakejscott/c1163141a0c536bb8a96 to your computer and use it in GitHub Desktop.
Save jakejscott/c1163141a0c536bb8a96 to your computer and use it in GitHub Desktop.
postgres-queue
using System;
using System.Linq;
using System.Timers;
using Dapper;
using Npgsql;
namespace PostgresQueue
{
public class Program
{
const string conninfo = "Server=10.53.10.197;Port=5432;User Id=postgres;Password=postgres;Database=queuedb;SyncNotification=true;";
private static Timer timer;
public static void Main(string[] args)
{
using (var connection = new NpgsqlConnection(conninfo))
{
connection.Open();
timer = new Timer(10000);
timer.Elapsed += OnTimeout;
timer.Enabled = false;
ProcessJobs();
var command = new NpgsqlCommand("LISTEN pending_job", connection);
command.ExecuteNonQuery();
connection.Notification += OnNotification;
Console.WriteLine("listening for new notifications...");
Console.ReadLine();
}
}
private static void OnTimeout(object sender, ElapsedEventArgs e)
{
Console.WriteLine("received no work for 90 seconds, checking for new work");
ProcessJobs();
}
private static void OnNotification(object sender, NpgsqlNotificationEventArgs e)
{
Console.WriteLine("Received notification");
ProcessJobs();
}
private static void ProcessJobs()
{
timer.Stop();
using (var connection = new NpgsqlConnection(conninfo))
{
connection.Open();
while (true)
{
var jobs = connection.Query<Job>(sql: "SELECT * FROM public.pending_jobs(@BatchSize);", param: new { @BatchSize = 1 }).ToList();
if (jobs.Count == 0)
{
Console.WriteLine("No more work!");
break;
}
foreach (var job in jobs)
{
DoWork(job);
}
}
}
timer.Start();
}
private static void DoWork(Job job)
{
Console.WriteLine("Starting work on {0}", job.Id);
}
}
public class Job
{
public int Id { get; set; }
public string Name { get; set; }
public string Json { get; set; }
public int State { get; set; }
}
}
var pg = require('pg').native;
var conninfo = "postgres://postgres:postgres@localhost/queuedb";
function doWork(job) {
console.log("Starting work on job", job);
}
function processJobs(client, done) {
client.query('select * from public.pending_jobs($1);', [1], function(err, result) {
if (err) {
done(err, null);
return;
}
if (result.rows.length == 0) {
done(null, true);
return;
}
for(i = 0; i < result.rows.length; i++) {
var job = result.rows;
doWork(job);
}
processJobs(client, done);
});
}
function waitForNotification() {
var client = new pg.Client(conninfo);
var timeoutTimer;
client.connect(function(err) {
if (err) {
return console.error('could not connect to postgres', err);
}
client.on('notification', function(msg) {
console.log('received notification!');
clearTimeout(timeoutTimer);
processJobs(client, function(err, result) {
if (err) {
console.error('error getting job to process', err);
}
startTimeoutTimer();
});
});
client.query('LISTEN pending_job');
console.log('listening for new notifications...');
});
function startTimeoutTimer() {
clearTimeout(timeoutTimer);
timeoutTimer = setTimeout(function() {
console.log('received no work for 90 seconds, checking for new work');
clearTimeout(timeoutTimer);
processJobs(client, function(err, result) {
if (err) {
console.error('error getting job to process', err);
}
client.end();
waitForNotification();
});
}, 10000);
}
startTimeoutTimer();
}
waitForNotification();
extern crate rustc_serialize as serialize;
extern crate postgres;
//use std::thread;
use serialize::json;
use postgres::{Connection, SslMode};
static CONN_STR: &'static str = "postgres://postgres:[email protected]/queuedb";
#[derive(Debug)]
struct Job {
id: i32,
name: String,
json: json::Json,
state: i32,
}
fn process_jobs(conn: &Connection) {
println!("processing jobs");
let stmt = conn.prepare("SELECT id, name, json, state FROM public.pending_jobs($1)").unwrap();
let rows = stmt.query(&[&1]).unwrap();
for row in rows {
let job = Job {
id: row.get(0),
name: row.get(1),
json: row.get(2),
state: row.get(3),
};
print!("Job:{} ", job.id);
}
println!("done processing jobs");
}
fn wait_for_notification(conn: &Connection) {
conn.execute("LISTEN pending_job", &[]).unwrap();
let mut notifications = conn.notifications();
notifications.next_block().unwrap();
}
fn connect() -> Connection {
return match Connection::connect(CONN_STR, &SslMode::None) {
Ok(conn) => conn,
Err(err) => panic!("Error connecting to the database: {:?}", err)
};
}
fn main() {
let conn = connect();
loop {
process_jobs(&conn);
wait_for_notification(&conn);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment