Skip to content

Instantly share code, notes, and snippets.

@WomB0ComB0
Created November 15, 2024 19:01
Show Gist options
  • Save WomB0ComB0/be28f89e7ece9e90d0d695fce850eb5f to your computer and use it in GitHub Desktop.
Save WomB0ComB0/be28f89e7ece9e90d0d695fce850eb5f to your computer and use it in GitHub Desktop.
Go multi-threading template (will apply opinionated changes soon)
package concurrency
import (
"context"
"fmt"
"log"
"sync"
"time"
"errors"
)
// Task represents a unit of work to be processed
type Task struct {
ID string
Data interface{}
CreatedAt time.Time
}
// Result represents the outcome of a task
type Result struct {
TaskID string
Output interface{}
Error error
ProcessedAt time.Time
ProcessingTime time.Duration
}
// WorkerPool manages a pool of workers for concurrent task processing
type WorkerPool struct {
numWorkers int
taskQueue chan Task
results chan Result
done chan struct{}
wg sync.WaitGroup
errorChan chan error
ctx context.Context
cancel context.CancelFunc
}
// NewWorkerPool creates a new worker pool with the specified number of workers
func NewWorkerPool(ctx context.Context, numWorkers int, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(ctx)
return &WorkerPool{
numWorkers: numWorkers,
taskQueue: make(chan Task, queueSize),
results: make(chan Result, queueSize),
done: make(chan struct{}),
errorChan: make(chan error, numWorkers),
ctx: ctx,
cancel: cancel,
}
}
// Start initializes and starts the worker pool
func (wp *WorkerPool) Start() {
// Start workers
for i := 0; i < wp.numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
// Start result collector
go wp.resultCollector()
}
// worker processes tasks from the task queue
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
log.Printf("Worker %d started", id)
for {
select {
case <-wp.ctx.Done():
log.Printf("Worker %d shutting down", id)
return
case task, ok := <-wp.taskQueue:
if !ok {
return
}
startTime := time.Now()
// Process the task
output, err := wp.processTask(task)
// Create result
result := Result{
TaskID: task.ID,
Output: output,
Error: err,
ProcessedAt: time.Now(),
ProcessingTime: time.Since(startTime),
}
// Send result
select {
case wp.results <- result:
case <-wp.ctx.Done():
return
}
}
}
}
// processTask handles the actual task processing
// This should be modified according to your specific needs
func (wp *WorkerPool) processTask(task Task) (interface{}, error) {
// Example processing logic
switch v := task.Data.(type) {
case string:
return fmt.Sprintf("Processed string: %s", v), nil
case int:
return v * 2, nil
default:
return nil, fmt.Errorf("unsupported data type for task %s", task.ID)
}
}
// resultCollector collects and handles results from workers
func (wp *WorkerPool) resultCollector() {
for result := range wp.results {
if result.Error != nil {
log.Printf("Error processing task %s: %v", result.TaskID, result.Error)
select {
case wp.errorChan <- result.Error:
default:
log.Printf("Error channel full, dropped error from task %s", result.TaskID)
}
} else {
log.Printf("Task %s completed successfully in %v", result.TaskID, result.ProcessingTime)
}
}
}
// SubmitTask adds a new task to the worker pool
func (wp *WorkerPool) SubmitTask(task Task) error {
select {
case <-wp.ctx.Done():
return errors.New("worker pool is shutting down")
case wp.taskQueue <- task:
return nil
}
}
// Shutdown gracefully shuts down the worker pool
func (wp *WorkerPool) Shutdown() {
wp.cancel() // Signal all workers to stop
close(wp.taskQueue)
wp.wg.Wait()
close(wp.results)
close(wp.errorChan)
close(wp.done)
}
// BatchProcessor handles concurrent processing of batch operations
type BatchProcessor struct {
batchSize int
timeout time.Duration
wp *WorkerPool
}
// NewBatchProcessor creates a new batch processor
func NewBatchProcessor(batchSize int, timeout time.Duration, wp *WorkerPool) *BatchProcessor {
return &BatchProcessor{
batchSize: batchSize,
timeout: timeout,
wp: wp,
}
}
// ProcessBatch processes a batch of items concurrently
func (bp *BatchProcessor) ProcessBatch(items []interface{}) ([]Result, error) {
results := make([]Result, 0, len(items))
resultChan := make(chan Result, len(items))
errChan := make(chan error, 1)
var wg sync.WaitGroup
// Process items in batches
for i := 0; i < len(items); i += bp.batchSize {
end := i + bp.batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
wg.Add(1)
go func(batch []interface{}) {
defer wg.Done()
for _, item := range batch {
task := Task{
ID: fmt.Sprintf("task-%d", time.Now().UnixNano()),
Data: item,
CreatedAt: time.Now(),
}
if err := bp.wp.SubmitTask(task); err != nil {
errChan <- err
return
}
}
}(batch)
}
// Wait for all batches to complete or timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return results, nil
case err := <-errChan:
return nil, err
case <-time.After(bp.timeout):
return nil, errors.New("batch processing timed out")
}
}
// Example usage
func Example() {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Create worker pool
wp := NewWorkerPool(ctx, 4, 100)
wp.Start()
defer wp.Shutdown()
// Create batch processor
bp := NewBatchProcessor(10, 30*time.Second, wp)
// Example items to process
items := []interface{}{
"item1",
42,
"item3",
123,
}
// Process items in batch
results, err := bp.ProcessBatch(items)
if err != nil {
log.Printf("Batch processing error: %v", err)
return
}
// Handle results
for _, result := range results {
if result.Error != nil {
log.Printf("Error processing task %s: %v", result.TaskID, result.Error)
} else {
log.Printf("Task %s completed: %v", result.TaskID, result.Output)
}
}
}
// Example of rate-limited concurrent operations
func RateLimitedExample() {
// Create rate limiter channel
const rateLimit = 10 // operations per second
ticker := time.NewTicker(time.Second / rateLimit)
defer ticker.Stop()
// Create worker pool
ctx := context.Background()
wp := NewWorkerPool(ctx, 4, 100)
wp.Start()
defer wp.Shutdown()
// Process items with rate limiting
items := make([]string, 100)
for i := range items {
<-ticker.C // Wait for rate limiter
task := Task{
ID: fmt.Sprintf("task-%d", i),
Data: fmt.Sprintf("item-%d", i),
CreatedAt: time.Now(),
}
if err := wp.SubmitTask(task); err != nil {
log.Printf("Error submitting task: %v", err)
continue
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment