Skip to content

Instantly share code, notes, and snippets.

@sqs
Created August 11, 2014 22:15
Show Gist options
  • Save sqs/f392bfe077bd3cfdf5fc to your computer and use it in GitHub Desktop.
Save sqs/f392bfe077bd3cfdf5fc to your computer and use it in GitHub Desktop.
simple postgresql queue in go
package db
import (
"strings"
"time"
"github.com/sqs/modl"
)
// SimpleQueue is a PostgreSQL-backed queue with atomic enqueue and dequeue, no
// job history, and a configurable visibility timeout.
type SimpleQueue struct {
TableName string
// VisibilityTimeout is the duration to hide items that have been dequeued
// but not ended. After this duration, dequeued (and not-yet-ended) items
// are effectively placed back on the queue.
//
// This should be set to a value longer than the expected duration of
// performing any given task, or else duplicate work will be performed.
VisibilityTimeout time.Duration
}
// QueueItem is an item in a SimpleQueue. The ID field corresponds to values in
// the SimpleQueue's IDColumn, and the Task field corresponds to values in the
// SimpleQueue's TaskColumn.
type QueueItem struct {
ID int
Task string
}
// AddSimpleQueueTable adds the table for q to the global modl DbMap.
func AddSimpleQueueTable(q *SimpleQueue) {
CreateSQL = append(CreateSQL, `
CREATE TABLE IF NOT EXISTS `+q.TableName+` (
id integer NOT NULL,
task character varying(255) NOT NULL,
enqueued_at timestamp with time zone,
started_at timestamp with time zone,
ended_at timestamp with time zone,
CONSTRAINT `+q.TableName+`_pkey PRIMARY KEY (id, task)
);`)
DropSQL = append(DropSQL, `DROP TABLE IF EXISTS `+q.TableName+`;`)
}
// Enqueue updates the enqueued_at field to now in the queue table row (or
// creates a new row), to indicate that the job should run. It is safe to call
// this function on the same primary key value concurrently.
func (q *SimpleQueue) Enqueue(dbh modl.SqlExecutor, id int, task string) error {
// This SQL query will either INSERT a new row *xor* UPDATE the existing
// row. It will never create two rows with the same primary key value.
sql := q.interpolate(`
WITH
update_result AS (
UPDATE {{TableName}} SET
enqueued_at = CASE
WHEN enqueued_at < ended_at THEN clock_timestamp()
ELSE enqueued_at
END
WHERE id = $1 AND task = $2
RETURNING 1
),
insert_data AS (
SELECT $1 AS id, $2 AS task,
clock_timestamp() AS enqueued_at,
NULL::timestamp without time zone AS started_at,
NULL::timestamp without time zone AS ended_at
)
INSERT INTO {{TableName}}(id, task, enqueued_at, started_at, ended_at)
SELECT * FROM insert_data
WHERE NOT EXISTS (SELECT NULL FROM update_result)`)
_, err := dbh.Exec(sql, id, task)
return err
}
// DequeueNext reads the next items in the queue into res. The maximum number of
// items returned by each call is undefined, so callers should repeatedly poll
// (with intermittent sleeps) this function. It is safe to call this function
// concurrently; the resulting lists of items will not overlap (unless an update
// is requested in the interim).
//
// If a caller retrieves a list of items but fails before updating them, there
// is no mechanism for those items to be placed back on the queue. This is OK
// for now, as simple queue tasks are not as likely to fail as repo graphing,
// and they are not as crucial.
func (q *SimpleQueue) DequeueNext(dbh modl.SqlExecutor, max int) ([]*QueueItem, error) {
sql := q.interpolate(`
WITH
to_dequeue AS (
SELECT id AS id, task AS task FROM {{TableName}}
WHERE started_at IS NULL OR enqueued_at > started_at OR
-- visibility timeout:
((ended_at IS NULL OR ended_at < enqueued_at) AND (started_at < clock_timestamp() - ($2::int * interval '1 millisecond')))
ORDER BY enqueued_at ASC NULLS FIRST
LIMIT $1
FOR UPDATE
)
UPDATE {{TableName}}
SET started_at = clock_timestamp()
FROM to_dequeue
WHERE {{TableName}}.id = to_dequeue.id AND {{TableName}}.task = to_dequeue.task
RETURNING {{TableName}}.id AS id, {{TableName}}.task AS task`)
var res []*QueueItem
err := dbh.Select(&res, sql, max, q.VisibilityTimeout/time.Millisecond)
if err != nil {
return nil, err
}
return res, nil
}
// Ended marks a queue item as ended by setting the ended_at column to the
// current timestamp.
func (q *SimpleQueue) Ended(dbh modl.SqlExecutor, id int, task string) error {
sql := q.interpolate(`
UPDATE {{TableName}}
SET ended_at = clock_timestamp()
WHERE id = $1 AND task = $2`)
_, err := dbh.Exec(sql, id, task)
return err
}
func (q *SimpleQueue) interpolate(sql string) string {
sql = strings.Replace(sql, "{{TableName}}", q.TableName, -1)
return sql
}
package db
import (
"reflect"
"testing"
"time"
"github.com/sqs/modl"
)
const fakeID = 123
var fakeQueue = SimpleQueue{
TableName: "test_fake_queue",
VisibilityTimeout: 300 * time.Millisecond,
}
func createFakeQueueTable(t *testing.T, tx modl.SqlExecutor) {
_, err := tx.Exec(`DROP TABLE IF EXISTS test_fake_queue;`)
if err != nil {
t.Fatal(err)
}
AddSimpleQueueTable(&fakeQueue)
for _, sql := range CreateSQL {
_, err := tx.Exec(sql)
if err != nil {
t.Fatal(err)
}
}
}
func TestSimpleQueue(t *testing.T) {
Connect()
tx, _ := DB.Begin()
defer tx.Rollback()
createFakeQueueTable(t, tx)
items, err := fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if len(items) != 0 {
t.Errorf("want items empty (nothing enqueued yet), got %+v", items)
}
err = fakeQueue.Enqueue(tx, fakeID, "task0")
if err != nil {
t.Fatal("Enqueue:", err)
}
err = fakeQueue.Enqueue(tx, fakeID, "task1")
if err != nil {
t.Fatal("Enqueue (1st time with same task):", err)
}
err = fakeQueue.Enqueue(tx, fakeID, "task1")
if err != nil {
t.Fatal("Enqueue (2nd time with same task):", err)
}
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if want := []*QueueItem{{fakeID, "task0"}}; !reflect.DeepEqual(want, items) {
t.Errorf("want items == %v, got %v", want, items)
}
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if want := []*QueueItem{{fakeID, "task1"}}; !reflect.DeepEqual(want, items) {
t.Errorf("want items == %v, got %v", want, items)
}
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if len(items) != 0 {
t.Errorf("want items empty (already dequeued), got %+v", items)
}
err = fakeQueue.Ended(tx, fakeID, "task0")
if err != nil {
t.Fatal("Ended:", err)
}
// Test that enqueueing task1 before it has been ended does not reenqueue
// it.
err = fakeQueue.Enqueue(tx, fakeID, "task1")
if err != nil {
t.Fatal("Enqueue:", err)
}
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if len(items) != 0 {
t.Errorf("want items empty (task1 is not ended yet), got %+v", items)
}
// Now end task1 and test that it is re-enqueued.
err = fakeQueue.Ended(tx, fakeID, "task1")
if err != nil {
t.Fatal("Ended:", err)
}
err = fakeQueue.Enqueue(tx, fakeID, "task1")
if err != nil {
t.Fatal("Enqueue:", err)
}
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if want := []*QueueItem{{fakeID, "task1"}}; !reflect.DeepEqual(want, items) {
t.Errorf("want items == %v, got %v", want, items)
}
// Test that after the visibility timeout, task1 can be dequeued (even
// though it hasn't been ended yet).
time.Sleep(fakeQueue.VisibilityTimeout)
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if want := []*QueueItem{{fakeID, "task1"}}; !reflect.DeepEqual(want, items) {
t.Errorf("want items == %v, got %v", want, items)
}
err = fakeQueue.Ended(tx, fakeID, "task1")
if err != nil {
t.Fatal("Ended:", err)
}
// Test that enqueueing an item twice leaves its enqueued_at value at the
// earlier value.
err = fakeQueue.Enqueue(tx, fakeID, "task0")
if err != nil {
t.Fatal("Enqueue:", err)
}
err = fakeQueue.Enqueue(tx, fakeID, "task1")
if err != nil {
t.Fatal("Enqueue:", err)
}
err = fakeQueue.Enqueue(tx, fakeID, "task0")
if err != nil {
t.Fatal("Enqueue:", err)
}
items, err = fakeQueue.DequeueNext(tx, 1)
if err != nil {
t.Fatal("DequeueNext:", err)
}
if want := []*QueueItem{{fakeID, "task0"}}; !reflect.DeepEqual(want, items) {
t.Errorf("want items == %v, got %v", want, items)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment