Created
August 11, 2014 22:15
-
-
Save sqs/f392bfe077bd3cfdf5fc to your computer and use it in GitHub Desktop.
simple postgresql queue in go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package db | |
import ( | |
"strings" | |
"time" | |
"github.com/sqs/modl" | |
) | |
// SimpleQueue is a PostgreSQL-backed queue with atomic enqueue and dequeue, no | |
// job history, and a configurable visibility timeout. | |
type SimpleQueue struct { | |
TableName string | |
// VisibilityTimeout is the duration to hide items that have been dequeued | |
// but not ended. After this duration, dequeued (and not-yet-ended) items | |
// are effectively placed back on the queue. | |
// | |
// This should be set to a value longer than the expected duration of | |
// performing any given task, or else duplicate work will be performed. | |
VisibilityTimeout time.Duration | |
} | |
// QueueItem is an item in a SimpleQueue. The ID field corresponds to values in | |
// the SimpleQueue's IDColumn, and the Task field corresponds to values in the | |
// SimpleQueue's TaskColumn. | |
type QueueItem struct { | |
ID int | |
Task string | |
} | |
// AddSimpleQueueTable adds the table for q to the global modl DbMap. | |
func AddSimpleQueueTable(q *SimpleQueue) { | |
CreateSQL = append(CreateSQL, ` | |
CREATE TABLE IF NOT EXISTS `+q.TableName+` ( | |
id integer NOT NULL, | |
task character varying(255) NOT NULL, | |
enqueued_at timestamp with time zone, | |
started_at timestamp with time zone, | |
ended_at timestamp with time zone, | |
CONSTRAINT `+q.TableName+`_pkey PRIMARY KEY (id, task) | |
);`) | |
DropSQL = append(DropSQL, `DROP TABLE IF EXISTS `+q.TableName+`;`) | |
} | |
// Enqueue updates the enqueued_at field to now in the queue table row (or | |
// creates a new row), to indicate that the job should run. It is safe to call | |
// this function on the same primary key value concurrently. | |
func (q *SimpleQueue) Enqueue(dbh modl.SqlExecutor, id int, task string) error { | |
// This SQL query will either INSERT a new row *xor* UPDATE the existing | |
// row. It will never create two rows with the same primary key value. | |
sql := q.interpolate(` | |
WITH | |
update_result AS ( | |
UPDATE {{TableName}} SET | |
enqueued_at = CASE | |
WHEN enqueued_at < ended_at THEN clock_timestamp() | |
ELSE enqueued_at | |
END | |
WHERE id = $1 AND task = $2 | |
RETURNING 1 | |
), | |
insert_data AS ( | |
SELECT $1 AS id, $2 AS task, | |
clock_timestamp() AS enqueued_at, | |
NULL::timestamp without time zone AS started_at, | |
NULL::timestamp without time zone AS ended_at | |
) | |
INSERT INTO {{TableName}}(id, task, enqueued_at, started_at, ended_at) | |
SELECT * FROM insert_data | |
WHERE NOT EXISTS (SELECT NULL FROM update_result)`) | |
_, err := dbh.Exec(sql, id, task) | |
return err | |
} | |
// DequeueNext reads the next items in the queue into res. The maximum number of | |
// items returned by each call is undefined, so callers should repeatedly poll | |
// (with intermittent sleeps) this function. It is safe to call this function | |
// concurrently; the resulting lists of items will not overlap (unless an update | |
// is requested in the interim). | |
// | |
// If a caller retrieves a list of items but fails before updating them, there | |
// is no mechanism for those items to be placed back on the queue. This is OK | |
// for now, as simple queue tasks are not as likely to fail as repo graphing, | |
// and they are not as crucial. | |
func (q *SimpleQueue) DequeueNext(dbh modl.SqlExecutor, max int) ([]*QueueItem, error) { | |
sql := q.interpolate(` | |
WITH | |
to_dequeue AS ( | |
SELECT id AS id, task AS task FROM {{TableName}} | |
WHERE started_at IS NULL OR enqueued_at > started_at OR | |
-- visibility timeout: | |
((ended_at IS NULL OR ended_at < enqueued_at) AND (started_at < clock_timestamp() - ($2::int * interval '1 millisecond'))) | |
ORDER BY enqueued_at ASC NULLS FIRST | |
LIMIT $1 | |
FOR UPDATE | |
) | |
UPDATE {{TableName}} | |
SET started_at = clock_timestamp() | |
FROM to_dequeue | |
WHERE {{TableName}}.id = to_dequeue.id AND {{TableName}}.task = to_dequeue.task | |
RETURNING {{TableName}}.id AS id, {{TableName}}.task AS task`) | |
var res []*QueueItem | |
err := dbh.Select(&res, sql, max, q.VisibilityTimeout/time.Millisecond) | |
if err != nil { | |
return nil, err | |
} | |
return res, nil | |
} | |
// Ended marks a queue item as ended by setting the ended_at column to the | |
// current timestamp. | |
func (q *SimpleQueue) Ended(dbh modl.SqlExecutor, id int, task string) error { | |
sql := q.interpolate(` | |
UPDATE {{TableName}} | |
SET ended_at = clock_timestamp() | |
WHERE id = $1 AND task = $2`) | |
_, err := dbh.Exec(sql, id, task) | |
return err | |
} | |
func (q *SimpleQueue) interpolate(sql string) string { | |
sql = strings.Replace(sql, "{{TableName}}", q.TableName, -1) | |
return sql | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package db | |
import ( | |
"reflect" | |
"testing" | |
"time" | |
"github.com/sqs/modl" | |
) | |
const fakeID = 123 | |
var fakeQueue = SimpleQueue{ | |
TableName: "test_fake_queue", | |
VisibilityTimeout: 300 * time.Millisecond, | |
} | |
func createFakeQueueTable(t *testing.T, tx modl.SqlExecutor) { | |
_, err := tx.Exec(`DROP TABLE IF EXISTS test_fake_queue;`) | |
if err != nil { | |
t.Fatal(err) | |
} | |
AddSimpleQueueTable(&fakeQueue) | |
for _, sql := range CreateSQL { | |
_, err := tx.Exec(sql) | |
if err != nil { | |
t.Fatal(err) | |
} | |
} | |
} | |
func TestSimpleQueue(t *testing.T) { | |
Connect() | |
tx, _ := DB.Begin() | |
defer tx.Rollback() | |
createFakeQueueTable(t, tx) | |
items, err := fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if len(items) != 0 { | |
t.Errorf("want items empty (nothing enqueued yet), got %+v", items) | |
} | |
err = fakeQueue.Enqueue(tx, fakeID, "task0") | |
if err != nil { | |
t.Fatal("Enqueue:", err) | |
} | |
err = fakeQueue.Enqueue(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Enqueue (1st time with same task):", err) | |
} | |
err = fakeQueue.Enqueue(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Enqueue (2nd time with same task):", err) | |
} | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if want := []*QueueItem{{fakeID, "task0"}}; !reflect.DeepEqual(want, items) { | |
t.Errorf("want items == %v, got %v", want, items) | |
} | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if want := []*QueueItem{{fakeID, "task1"}}; !reflect.DeepEqual(want, items) { | |
t.Errorf("want items == %v, got %v", want, items) | |
} | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if len(items) != 0 { | |
t.Errorf("want items empty (already dequeued), got %+v", items) | |
} | |
err = fakeQueue.Ended(tx, fakeID, "task0") | |
if err != nil { | |
t.Fatal("Ended:", err) | |
} | |
// Test that enqueueing task1 before it has been ended does not reenqueue | |
// it. | |
err = fakeQueue.Enqueue(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Enqueue:", err) | |
} | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if len(items) != 0 { | |
t.Errorf("want items empty (task1 is not ended yet), got %+v", items) | |
} | |
// Now end task1 and test that it is re-enqueued. | |
err = fakeQueue.Ended(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Ended:", err) | |
} | |
err = fakeQueue.Enqueue(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Enqueue:", err) | |
} | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if want := []*QueueItem{{fakeID, "task1"}}; !reflect.DeepEqual(want, items) { | |
t.Errorf("want items == %v, got %v", want, items) | |
} | |
// Test that after the visibility timeout, task1 can be dequeued (even | |
// though it hasn't been ended yet). | |
time.Sleep(fakeQueue.VisibilityTimeout) | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if want := []*QueueItem{{fakeID, "task1"}}; !reflect.DeepEqual(want, items) { | |
t.Errorf("want items == %v, got %v", want, items) | |
} | |
err = fakeQueue.Ended(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Ended:", err) | |
} | |
// Test that enqueueing an item twice leaves its enqueued_at value at the | |
// earlier value. | |
err = fakeQueue.Enqueue(tx, fakeID, "task0") | |
if err != nil { | |
t.Fatal("Enqueue:", err) | |
} | |
err = fakeQueue.Enqueue(tx, fakeID, "task1") | |
if err != nil { | |
t.Fatal("Enqueue:", err) | |
} | |
err = fakeQueue.Enqueue(tx, fakeID, "task0") | |
if err != nil { | |
t.Fatal("Enqueue:", err) | |
} | |
items, err = fakeQueue.DequeueNext(tx, 1) | |
if err != nil { | |
t.Fatal("DequeueNext:", err) | |
} | |
if want := []*QueueItem{{fakeID, "task0"}}; !reflect.DeepEqual(want, items) { | |
t.Errorf("want items == %v, got %v", want, items) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment