Skip to content

Instantly share code, notes, and snippets.

@akutz
Created April 4, 2016 23:30
Show Gist options
  • Save akutz/970b9dbd9ab1e72e7cf69e2ab86be5dc to your computer and use it in GitHub Desktop.
Save akutz/970b9dbd9ab1e72e7cf69e2ab86be5dc to your computer and use it in GitHub Desktop.
Concurrent Queue in Go
package httputils
import (
"sync"
"github.com/emccode/libstorage/api/types"
)
// NewTaskQueue returns a new types.TaskQueue.
func NewTaskQueue() types.TaskQueue {
return &taskQueue{}
}
type taskQueueItem struct {
task *types.Task
next *taskQueueItem
}
type taskQueue struct {
sync.RWMutex
count int
head *taskQueueItem
tail *taskQueueItem
}
func (q *taskQueue) Enqueue(t *types.Task) {
q.Lock()
defer q.Unlock()
defer func() { q.count++ }()
i := &taskQueueItem{t, nil}
if q.head == nil {
q.head = i
q.tail = i
return
}
q.tail.next = i
q.tail = i
}
func (q *taskQueue) Dequeue() *types.Task {
q.Lock()
defer q.Unlock()
if q.count == 0 {
return nil
}
i := q.head
q.head = i.next
if q.head == nil {
q.tail = nil
}
i.next = nil
q.count--
return i.task
}
func (q *taskQueue) Len() int {
q.RLock()
defer q.RUnlock()
return q.count
}
func (q *taskQueue) Tasks() <-chan *types.Task {
q.RLock()
defer q.RUnlock()
c := make(chan *types.Task)
go func() {
i := q.head
for x := 0; x < q.count; x++ {
c <- i.task
i = i.next
}
close(c)
}()
return c
}
package httputils
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/emccode/libstorage/api/types"
)
func TestTaskQueue(t *testing.T) {
const maxTasks = 100
tasks := make([]*types.Task, maxTasks)
for x := 0; x < maxTasks; x++ {
tasks[x] = &types.Task{ID: fmt.Sprintf("task-%d", x)}
}
q := NewTaskQueue()
eqdqOne := make(chan int)
waitOne := make(chan int)
// test enqueue
for x := 0; x < maxTasks; x++ {
go func(y int) {
<-eqdqOne
q.Enqueue(tasks[y])
waitOne <- 1
}(x)
}
eqdqOne <- 1
<-waitOne
assert.Equal(t, 1, q.Len())
eqdqOne <- 1
<-waitOne
assert.Equal(t, 2, q.Len())
go func() {
for x := 2; x < maxTasks; x++ {
eqdqOne <- 1
}
}()
for x := 2; x < maxTasks; x++ {
<-waitOne
}
assert.Equal(t, maxTasks, q.Len())
// test channel
lt := 0
for range q.Tasks() {
lt++
}
assert.Equal(t, maxTasks, lt)
// test dequeue
for x := 0; x < maxTasks; x++ {
go func() {
<-eqdqOne
q.Dequeue()
waitOne <- 1
}()
}
eqdqOne <- 1
<-waitOne
assert.Equal(t, 99, q.Len())
eqdqOne <- 1
<-waitOne
assert.Equal(t, 98, q.Len())
go func() {
for x := 2; x < maxTasks; x++ {
eqdqOne <- 1
}
}()
for x := 2; x < maxTasks; x++ {
<-waitOne
}
assert.Equal(t, 0, q.Len())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment