Skip to content

Instantly share code, notes, and snippets.

@naquad
Created March 18, 2014 16:09
Show Gist options
  • Save naquad/9623237 to your computer and use it in GitHub Desktop.
Save naquad/9623237 to your computer and use it in GitHub Desktop.
/**
* Package implements a queue structure with task state tracking.
* Task can have 4 states:
* * Confirmed. When task was processed successfully it is confirmed.
* Confirm() function removes task from queue.
*
* * Discarded. Removed from queue, discarded flag set.
* Pretty much the same as Confirmed,
* just with differrent name to indicate cancellation
* of processing
*
* * Busy. Task is being processed right now.
*
* * Ready. When task is ready for processing.
*
* Tasks can be retried.
* This means that task will be queued for processing again
* and task tries counter will be increased.
*/
package main
import (
"sync"
)
// Task states
type ConfirmableTaskState int
const (
TaskReady ConfirmableTaskState = 0
TaskBusy ConfirmableTaskState = 1
TaskConfirmed ConfirmableTaskState = 2
TaskDiscarded ConfirmableTaskState = 4
TaskProcessedMask ConfirmableTaskState = 6
)
// Task queue node. Task queue is basically a cyclic deque (double linked list)
type ConfirmableTask struct {
next *ConfirmableTask
prev *ConfirmableTask
state ConfirmableTaskState
tries int
queue *ConfirmableTaskQueue
Value interface{}
}
func newConfirmableTask(q *ConfirmableTaskQueue, v interface{}) *ConfirmableTask {
return &ConfirmableTask{
state: TaskReady,
queue: q,
Value: v,
}
}
// shortcuts for Queue methods
func (t *ConfirmableTask) Confirm(){
t.queue.Confirm(t)
}
func (t *ConfirmableTask) Discard(){
t.queue.Discard(t)
}
func (t *ConfirmableTask) Retry(){
t.queue.Retry(t)
}
// accessors. because variables here affect the way node is treated
// user can't write to them directly
func (t *ConfirmableTask) IsReady() bool {
return t.state == TaskReady
}
func (t *ConfirmableTask) IsBusy() bool {
return t.state == TaskBusy
}
func (t *ConfirmableTask) IsConfirmed() bool {
return t.state == TaskConfirmed
}
func (t *ConfirmableTask) IsDiscarded() bool {
return t.state == TaskDiscarded
}
func (t *ConfirmableTask) IsProcessed() bool {
return (t.state & TaskProcessedMask) != 0
}
func (t *ConfirmableTask) Tries() int {
return t.tries
}
func (t *ConfirmableTask) Queue() *ConfirmableTaskQueue {
return t.queue
}
// Confirmable Task Queue.
// A usual cyclic double linked list.
// Except that it doesn't maintain insertion order, but processing order.
// This means that head pointer points no to the first-inserted
// task, but to the next executed. In case if there are no
// ready tasks will point to task after last Pop()ped
type ConfirmableTaskQueue struct {
head *ConfirmableTask
size int
lock *sync.Mutex
cond *sync.Cond
}
func NewConfirmableTaskQueue() *ConfirmableTaskQueue {
lock := &sync.Mutex{}
return &ConfirmableTaskQueue{
lock: lock,
cond: sync.NewCond(lock),
}
}
// insert task into the queue
func (q *ConfirmableTaskQueue) addTask(task *ConfirmableTask) {
if q.head == nil {
q.head = task
task.next = task
task.prev = task
} else {
task.next = q.head
task.prev = q.head.prev
q.head.prev.next = task
q.head.prev = task
// make sure head points to next ready task
if q.head.IsBusy() {
q.head = task
}
}
q.size += 1
q.cond.Signal()
}
func (q *ConfirmableTaskQueue) Push(v interface{}) (task *ConfirmableTask) {
q.lock.Lock()
task = newConfirmableTask(q, v)
q.addTask(task)
q.lock.Unlock()
return
}
func (q *ConfirmableTaskQueue) Pop() (task *ConfirmableTask) {
q.lock.Lock()
for q.head == nil || q.head.IsBusy() { // if no tasks or all busy then wait
q.cond.Wait()
}
task = q.head
task.tries += 1
task.state = TaskBusy
q.head = task.next
// to preserve processing order we need to find next task.
// because we already know about 2 tasks we'll do it only
// when there's more than 2 tasks
if q.head.IsBusy() && q.size > 2 {
for slider := q.head.next; slider != task; slider = slider.next {
if slider.IsReady() {
q.head = slider
break
}
}
}
// because pop doesn't actually change queue contents
// no signal is emitted
q.lock.Unlock()
return
}
func (q *ConfirmableTaskQueue) removeWithState(task *ConfirmableTask, newState ConfirmableTaskState) {
q.lock.Lock()
if !task.IsProcessed() {
if q.size == 1 {
q.head = nil
} else {
task.prev.next = task.next
task.next.prev = task.prev
if q.head == task {
q.head = task.next
}
}
q.size -= 1
q.cond.Signal()
}
task.state = newState
q.lock.Unlock()
}
func (q *ConfirmableTaskQueue) Confirm(task *ConfirmableTask) {
q.removeWithState(task, TaskConfirmed)
}
func (q *ConfirmableTaskQueue) Discard(task *ConfirmableTask) {
q.removeWithState(task, TaskDiscarded)
}
func (q *ConfirmableTaskQueue) Retry(task *ConfirmableTask) {
q.lock.Lock()
if task.IsProcessed() {
task.tries = 0
q.addTask(task)
} else if task.IsBusy() {
if q.head.IsBusy() {
q.head = task
}
q.cond.Signal()
}
task.state = TaskReady
q.lock.Unlock()
}
func (q *ConfirmableTaskQueue) Size() (size int) {
q.lock.Lock()
size = q.size
q.lock.Unlock()
return
}
func (q *ConfirmableTaskQueue) Empty() bool {
return q.Size() == 0
}
// waits for all tasks to be confirmed / discarded
func (q *ConfirmableTaskQueue) Wait() {
q.lock.Lock()
for q.size != 0 {
q.cond.Wait()
}
q.lock.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment