Created
March 18, 2014 16:09
-
-
Save naquad/9623237 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Package implements a queue structure with task state tracking. | |
* Task can have 4 states: | |
* * Confirmed. When task was processed successfully it is confirmed. | |
* Confirm() function removes task from queue. | |
* | |
* * Discarded. Removed from queue, discarded flag set. | |
* Pretty much the same as Confirmed, | |
* just with differrent name to indicate cancellation | |
* of processing | |
* | |
* * Busy. Task is being processed right now. | |
* | |
* * Ready. When task is ready for processing. | |
* | |
* Tasks can be retried. | |
* This means that task will be queued for processing again | |
* and task tries counter will be increased. | |
*/ | |
package main | |
import ( | |
"sync" | |
) | |
// Task states | |
type ConfirmableTaskState int | |
const ( | |
TaskReady ConfirmableTaskState = 0 | |
TaskBusy ConfirmableTaskState = 1 | |
TaskConfirmed ConfirmableTaskState = 2 | |
TaskDiscarded ConfirmableTaskState = 4 | |
TaskProcessedMask ConfirmableTaskState = 6 | |
) | |
// Task queue node. Task queue is basically a cyclic deque (double linked list) | |
type ConfirmableTask struct { | |
next *ConfirmableTask | |
prev *ConfirmableTask | |
state ConfirmableTaskState | |
tries int | |
queue *ConfirmableTaskQueue | |
Value interface{} | |
} | |
func newConfirmableTask(q *ConfirmableTaskQueue, v interface{}) *ConfirmableTask { | |
return &ConfirmableTask{ | |
state: TaskReady, | |
queue: q, | |
Value: v, | |
} | |
} | |
// shortcuts for Queue methods | |
func (t *ConfirmableTask) Confirm(){ | |
t.queue.Confirm(t) | |
} | |
func (t *ConfirmableTask) Discard(){ | |
t.queue.Discard(t) | |
} | |
func (t *ConfirmableTask) Retry(){ | |
t.queue.Retry(t) | |
} | |
// accessors. because variables here affect the way node is treated | |
// user can't write to them directly | |
func (t *ConfirmableTask) IsReady() bool { | |
return t.state == TaskReady | |
} | |
func (t *ConfirmableTask) IsBusy() bool { | |
return t.state == TaskBusy | |
} | |
func (t *ConfirmableTask) IsConfirmed() bool { | |
return t.state == TaskConfirmed | |
} | |
func (t *ConfirmableTask) IsDiscarded() bool { | |
return t.state == TaskDiscarded | |
} | |
func (t *ConfirmableTask) IsProcessed() bool { | |
return (t.state & TaskProcessedMask) != 0 | |
} | |
func (t *ConfirmableTask) Tries() int { | |
return t.tries | |
} | |
func (t *ConfirmableTask) Queue() *ConfirmableTaskQueue { | |
return t.queue | |
} | |
// Confirmable Task Queue. | |
// A usual cyclic double linked list. | |
// Except that it doesn't maintain insertion order, but processing order. | |
// This means that head pointer points no to the first-inserted | |
// task, but to the next executed. In case if there are no | |
// ready tasks will point to task after last Pop()ped | |
type ConfirmableTaskQueue struct { | |
head *ConfirmableTask | |
size int | |
lock *sync.Mutex | |
cond *sync.Cond | |
} | |
func NewConfirmableTaskQueue() *ConfirmableTaskQueue { | |
lock := &sync.Mutex{} | |
return &ConfirmableTaskQueue{ | |
lock: lock, | |
cond: sync.NewCond(lock), | |
} | |
} | |
// insert task into the queue | |
func (q *ConfirmableTaskQueue) addTask(task *ConfirmableTask) { | |
if q.head == nil { | |
q.head = task | |
task.next = task | |
task.prev = task | |
} else { | |
task.next = q.head | |
task.prev = q.head.prev | |
q.head.prev.next = task | |
q.head.prev = task | |
// make sure head points to next ready task | |
if q.head.IsBusy() { | |
q.head = task | |
} | |
} | |
q.size += 1 | |
q.cond.Signal() | |
} | |
func (q *ConfirmableTaskQueue) Push(v interface{}) (task *ConfirmableTask) { | |
q.lock.Lock() | |
task = newConfirmableTask(q, v) | |
q.addTask(task) | |
q.lock.Unlock() | |
return | |
} | |
func (q *ConfirmableTaskQueue) Pop() (task *ConfirmableTask) { | |
q.lock.Lock() | |
for q.head == nil || q.head.IsBusy() { // if no tasks or all busy then wait | |
q.cond.Wait() | |
} | |
task = q.head | |
task.tries += 1 | |
task.state = TaskBusy | |
q.head = task.next | |
// to preserve processing order we need to find next task. | |
// because we already know about 2 tasks we'll do it only | |
// when there's more than 2 tasks | |
if q.head.IsBusy() && q.size > 2 { | |
for slider := q.head.next; slider != task; slider = slider.next { | |
if slider.IsReady() { | |
q.head = slider | |
break | |
} | |
} | |
} | |
// because pop doesn't actually change queue contents | |
// no signal is emitted | |
q.lock.Unlock() | |
return | |
} | |
func (q *ConfirmableTaskQueue) removeWithState(task *ConfirmableTask, newState ConfirmableTaskState) { | |
q.lock.Lock() | |
if !task.IsProcessed() { | |
if q.size == 1 { | |
q.head = nil | |
} else { | |
task.prev.next = task.next | |
task.next.prev = task.prev | |
if q.head == task { | |
q.head = task.next | |
} | |
} | |
q.size -= 1 | |
q.cond.Signal() | |
} | |
task.state = newState | |
q.lock.Unlock() | |
} | |
func (q *ConfirmableTaskQueue) Confirm(task *ConfirmableTask) { | |
q.removeWithState(task, TaskConfirmed) | |
} | |
func (q *ConfirmableTaskQueue) Discard(task *ConfirmableTask) { | |
q.removeWithState(task, TaskDiscarded) | |
} | |
func (q *ConfirmableTaskQueue) Retry(task *ConfirmableTask) { | |
q.lock.Lock() | |
if task.IsProcessed() { | |
task.tries = 0 | |
q.addTask(task) | |
} else if task.IsBusy() { | |
if q.head.IsBusy() { | |
q.head = task | |
} | |
q.cond.Signal() | |
} | |
task.state = TaskReady | |
q.lock.Unlock() | |
} | |
func (q *ConfirmableTaskQueue) Size() (size int) { | |
q.lock.Lock() | |
size = q.size | |
q.lock.Unlock() | |
return | |
} | |
func (q *ConfirmableTaskQueue) Empty() bool { | |
return q.Size() == 0 | |
} | |
// waits for all tasks to be confirmed / discarded | |
func (q *ConfirmableTaskQueue) Wait() { | |
q.lock.Lock() | |
for q.size != 0 { | |
q.cond.Wait() | |
} | |
q.lock.Unlock() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment