Last active
August 29, 2015 14:18
-
-
Save jakejscott/c1163141a0c536bb8a96 to your computer and use it in GitHub Desktop.
postgres-queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Timers; | |
using Dapper; | |
using Npgsql; | |
namespace PostgresQueue | |
{ | |
public class Program | |
{ | |
const string conninfo = "Server=10.53.10.197;Port=5432;User Id=postgres;Password=postgres;Database=queuedb;SyncNotification=true;"; | |
private static Timer timer; | |
public static void Main(string[] args) | |
{ | |
using (var connection = new NpgsqlConnection(conninfo)) | |
{ | |
connection.Open(); | |
timer = new Timer(10000); | |
timer.Elapsed += OnTimeout; | |
timer.Enabled = false; | |
ProcessJobs(); | |
var command = new NpgsqlCommand("LISTEN pending_job", connection); | |
command.ExecuteNonQuery(); | |
connection.Notification += OnNotification; | |
Console.WriteLine("listening for new notifications..."); | |
Console.ReadLine(); | |
} | |
} | |
private static void OnTimeout(object sender, ElapsedEventArgs e) | |
{ | |
Console.WriteLine("received no work for 90 seconds, checking for new work"); | |
ProcessJobs(); | |
} | |
private static void OnNotification(object sender, NpgsqlNotificationEventArgs e) | |
{ | |
Console.WriteLine("Received notification"); | |
ProcessJobs(); | |
} | |
private static void ProcessJobs() | |
{ | |
timer.Stop(); | |
using (var connection = new NpgsqlConnection(conninfo)) | |
{ | |
connection.Open(); | |
while (true) | |
{ | |
var jobs = connection.Query<Job>(sql: "SELECT * FROM public.pending_jobs(@BatchSize);", param: new { @BatchSize = 1 }).ToList(); | |
if (jobs.Count == 0) | |
{ | |
Console.WriteLine("No more work!"); | |
break; | |
} | |
foreach (var job in jobs) | |
{ | |
DoWork(job); | |
} | |
} | |
} | |
timer.Start(); | |
} | |
private static void DoWork(Job job) | |
{ | |
Console.WriteLine("Starting work on {0}", job.Id); | |
} | |
} | |
public class Job | |
{ | |
public int Id { get; set; } | |
public string Name { get; set; } | |
public string Json { get; set; } | |
public int State { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var pg = require('pg').native; | |
var conninfo = "postgres://postgres:postgres@localhost/queuedb"; | |
function doWork(job) { | |
console.log("Starting work on job", job); | |
} | |
function processJobs(client, done) { | |
client.query('select * from public.pending_jobs($1);', [1], function(err, result) { | |
if (err) { | |
done(err, null); | |
return; | |
} | |
if (result.rows.length == 0) { | |
done(null, true); | |
return; | |
} | |
for(i = 0; i < result.rows.length; i++) { | |
var job = result.rows; | |
doWork(job); | |
} | |
processJobs(client, done); | |
}); | |
} | |
function waitForNotification() { | |
var client = new pg.Client(conninfo); | |
var timeoutTimer; | |
client.connect(function(err) { | |
if (err) { | |
return console.error('could not connect to postgres', err); | |
} | |
client.on('notification', function(msg) { | |
console.log('received notification!'); | |
clearTimeout(timeoutTimer); | |
processJobs(client, function(err, result) { | |
if (err) { | |
console.error('error getting job to process', err); | |
} | |
startTimeoutTimer(); | |
}); | |
}); | |
client.query('LISTEN pending_job'); | |
console.log('listening for new notifications...'); | |
}); | |
function startTimeoutTimer() { | |
clearTimeout(timeoutTimer); | |
timeoutTimer = setTimeout(function() { | |
console.log('received no work for 90 seconds, checking for new work'); | |
clearTimeout(timeoutTimer); | |
processJobs(client, function(err, result) { | |
if (err) { | |
console.error('error getting job to process', err); | |
} | |
client.end(); | |
waitForNotification(); | |
}); | |
}, 10000); | |
} | |
startTimeoutTimer(); | |
} | |
waitForNotification(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate rustc_serialize as serialize; | |
extern crate postgres; | |
//use std::thread; | |
use serialize::json; | |
use postgres::{Connection, SslMode}; | |
static CONN_STR: &'static str = "postgres://postgres:[email protected]/queuedb"; | |
#[derive(Debug)] | |
struct Job { | |
id: i32, | |
name: String, | |
json: json::Json, | |
state: i32, | |
} | |
fn process_jobs(conn: &Connection) { | |
println!("processing jobs"); | |
let stmt = conn.prepare("SELECT id, name, json, state FROM public.pending_jobs($1)").unwrap(); | |
let rows = stmt.query(&[&1]).unwrap(); | |
for row in rows { | |
let job = Job { | |
id: row.get(0), | |
name: row.get(1), | |
json: row.get(2), | |
state: row.get(3), | |
}; | |
print!("Job:{} ", job.id); | |
} | |
println!("done processing jobs"); | |
} | |
fn wait_for_notification(conn: &Connection) { | |
conn.execute("LISTEN pending_job", &[]).unwrap(); | |
let mut notifications = conn.notifications(); | |
notifications.next_block().unwrap(); | |
} | |
fn connect() -> Connection { | |
return match Connection::connect(CONN_STR, &SslMode::None) { | |
Ok(conn) => conn, | |
Err(err) => panic!("Error connecting to the database: {:?}", err) | |
}; | |
} | |
fn main() { | |
let conn = connect(); | |
loop { | |
process_jobs(&conn); | |
wait_for_notification(&conn); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment