Created
August 5, 2023 02:15
-
-
Save leandronsp/c0a7daaa27f75dd65d9fa803db8489ad to your computer and use it in GitHub Desktop.
A dead simple background processor in Go, using a double-ended queue based on a doubly linked list
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// go mod init queues | |
// go mod tidy | |
// go get github.com/stretchr/testify/assert | |
// go mod tidy | |
// go test background_job_test.go | |
package queues | |
import ( | |
"fmt" | |
"time" | |
"sync" | |
"testing" | |
"errors" | |
"github.com/stretchr/testify/assert" | |
) | |
// LinkedList Deque | |
type Node struct { | |
Value int | |
Next *Node | |
Prev *Node | |
} | |
type Deque struct { | |
Head *Node | |
Tail *Node | |
} | |
func (deque *Deque) RPush(value int) { | |
node := Node{Value: value} | |
if deque.Tail == nil { | |
deque.Tail = &node | |
deque.Head = &node | |
return | |
} | |
node.Prev = deque.Tail | |
deque.Tail.Next = &node | |
deque.Tail = &node | |
} | |
func (deque *Deque) LPush(value int) { | |
node := Node{Value: value} | |
if deque.Head == nil { | |
deque.Head = &node | |
deque.Tail = &node | |
return | |
} | |
node.Next = deque.Head | |
deque.Head.Prev = &node | |
deque.Head = &node | |
} | |
func (deque *Deque) RPop() *int { | |
if deque.Tail == nil { | |
return nil | |
} | |
node := deque.Tail | |
deque.Tail = deque.Tail.Prev | |
if deque.Tail == nil { | |
deque.Head = nil | |
} else { | |
deque.Tail.Next = nil | |
} | |
return &node.Value | |
} | |
func (deque *Deque) LPop() *int { | |
if deque.Head == nil { | |
return nil | |
} | |
node := deque.Head | |
deque.Head = deque.Head.Next | |
if deque.Head == nil { | |
deque.Tail = nil | |
} else { | |
deque.Head.Prev = nil | |
} | |
return &node.Value | |
} | |
func (deque *Deque) RPopLPush(other *Deque) *int { | |
value := deque.RPop() | |
if value == nil { | |
return nil | |
} | |
other.LPush(*value) | |
return value | |
} | |
func (deque *Deque) ToSlice() []int { | |
slice := []int{} | |
for node := deque.Head; node != nil; node = node.Next { | |
slice = append(slice, node.Value) | |
} | |
return slice | |
} | |
// Blocking Deque | |
type BlockingDeque struct { | |
Store *Deque | |
Mutex sync.Mutex | |
Emitter *sync.Cond | |
} | |
func (deque *BlockingDeque) RPush(value int) { | |
deque.Mutex.Lock() | |
defer deque.Mutex.Unlock() | |
deque.Store.RPush(value) | |
deque.Emitter.Signal() | |
} | |
func (deque *BlockingDeque) LPush(value int) { | |
deque.Mutex.Lock() | |
defer deque.Mutex.Unlock() | |
deque.Store.LPush(value) | |
deque.Emitter.Signal() | |
} | |
func (deque *BlockingDeque) BRPop() *int { | |
deque.Mutex.Lock() | |
defer deque.Mutex.Unlock() | |
for deque.Store.Head == nil { | |
deque.Emitter.Wait() | |
} | |
return deque.Store.RPop() | |
} | |
func (deque *BlockingDeque) BLPop() *int { | |
deque.Mutex.Lock() | |
defer deque.Mutex.Unlock() | |
for deque.Store.Head == nil { | |
deque.Emitter.Wait() | |
} | |
return deque.Store.LPop() | |
} | |
func (deque *BlockingDeque) BRPopLPush(other *BlockingDeque) *int { | |
deque.Mutex.Lock() | |
defer deque.Mutex.Unlock() | |
for deque.Store.Head == nil { | |
deque.Emitter.Wait() | |
} | |
value := deque.Store.RPopLPush(other.Store) | |
deque.Emitter.Signal() | |
return value | |
} | |
func (deque *BlockingDeque) LPop() *int { | |
return deque.Store.LPop() | |
} | |
func (deque *BlockingDeque) RPop() *int { | |
return deque.Store.RPop() | |
} | |
func (deque *BlockingDeque) RPopLPush(other *BlockingDeque) { | |
deque.Store.RPopLPush(other.Store) | |
} | |
func (deque *BlockingDeque) ToSlice() []int { | |
return deque.Store.ToSlice() | |
} | |
// Worker | |
type Worker struct { | |
TaskQueue *BlockingDeque | |
ProcessingQueue *BlockingDeque | |
Dlq *BlockingDeque | |
Retries map[int]int | |
} | |
func NewWorker() *Worker { | |
taskQueue := BlockingDeque{Store: &Deque{}} | |
taskQueue.Mutex = sync.Mutex{} | |
taskQueue.Emitter = sync.NewCond(&taskQueue.Mutex) | |
processingQueue := BlockingDeque{Store: &Deque{}} | |
processingQueue.Mutex = sync.Mutex{} | |
processingQueue.Emitter = sync.NewCond(&processingQueue.Mutex) | |
dlq := BlockingDeque{Store: &Deque{}} | |
dlq.Mutex = sync.Mutex{} | |
dlq.Emitter = sync.NewCond(&dlq.Mutex) | |
return &Worker{ | |
TaskQueue: &taskQueue, | |
ProcessingQueue: &processingQueue, | |
Dlq: &dlq, | |
Retries: map[int]int{}, | |
} | |
} | |
func (worker *Worker) Start() { | |
go func() { | |
fmt.Println("Worker started. Waiting for tasks...") | |
for { | |
task := worker.TaskQueue.BRPopLPush(worker.ProcessingQueue) | |
err := worker.Process(*task) | |
if err != nil { | |
fmt.Printf("Task %d failed.\n", *task) | |
worker.Retries[*task] = worker.Retries[*task] + 1 | |
if worker.Retries[*task] > 3 { | |
fmt.Printf("Task %d failed too many times. Sending to DLQ.\n", *task) | |
worker.ProcessingQueue.RPopLPush(worker.Dlq) | |
} else { | |
fmt.Printf("Task %d failed. Retrying...\n", *task) | |
worker.ProcessingQueue.RPopLPush(worker.TaskQueue) | |
} | |
} | |
} | |
}() | |
} | |
func (worker *Worker) Process(task int) error { | |
fmt.Printf("Processing task %d...\n", task) | |
if task == 42 { | |
return errors.New("Task failed") | |
} | |
sleepDuration := task * int(time.Second) | |
time.Sleep(time.Duration(sleepDuration)) | |
worker.ProcessingQueue.LPop() | |
fmt.Printf("Task %d processed.\n", task) | |
return nil | |
} | |
// Testing | |
func TestBackgroundJob(t *testing.T) { | |
worker := NewWorker() | |
worker.TaskQueue.RPush(1) | |
worker.Start() | |
for len(worker.ProcessingQueue.ToSlice()) == 0 { | |
time.Sleep(10 * time.Millisecond) | |
} | |
assert.Equal(t, 1, len(worker.ProcessingQueue.ToSlice())) | |
assert.Equal(t, 0, len(worker.TaskQueue.ToSlice())) | |
for len(worker.ProcessingQueue.ToSlice()) > 0 { | |
time.Sleep(10 * time.Millisecond) | |
} | |
assert.Equal(t, 0, len(worker.ProcessingQueue.ToSlice())) | |
assert.Equal(t, 0, len(worker.TaskQueue.ToSlice())) | |
worker.TaskQueue.LPush(42) | |
for len(worker.Dlq.ToSlice()) == 0 { | |
time.Sleep(10 * time.Millisecond) | |
} | |
assert.Equal(t, 0, len(worker.ProcessingQueue.ToSlice())) | |
assert.Equal(t, 0, len(worker.TaskQueue.ToSlice())) | |
assert.Equal(t, 1, len(worker.Dlq.ToSlice())) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment