Created
November 15, 2024 19:01
-
-
Save WomB0ComB0/be28f89e7ece9e90d0d695fce850eb5f to your computer and use it in GitHub Desktop.
Go multi-threading template (will apply opinionated changes soon)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package concurrency | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
"errors" | |
) | |
// Task represents a unit of work to be processed | |
type Task struct { | |
ID string | |
Data interface{} | |
CreatedAt time.Time | |
} | |
// Result represents the outcome of a task | |
type Result struct { | |
TaskID string | |
Output interface{} | |
Error error | |
ProcessedAt time.Time | |
ProcessingTime time.Duration | |
} | |
// WorkerPool manages a pool of workers for concurrent task processing | |
type WorkerPool struct { | |
numWorkers int | |
taskQueue chan Task | |
results chan Result | |
done chan struct{} | |
wg sync.WaitGroup | |
errorChan chan error | |
ctx context.Context | |
cancel context.CancelFunc | |
} | |
// NewWorkerPool creates a new worker pool with the specified number of workers | |
func NewWorkerPool(ctx context.Context, numWorkers int, queueSize int) *WorkerPool { | |
ctx, cancel := context.WithCancel(ctx) | |
return &WorkerPool{ | |
numWorkers: numWorkers, | |
taskQueue: make(chan Task, queueSize), | |
results: make(chan Result, queueSize), | |
done: make(chan struct{}), | |
errorChan: make(chan error, numWorkers), | |
ctx: ctx, | |
cancel: cancel, | |
} | |
} | |
// Start initializes and starts the worker pool | |
func (wp *WorkerPool) Start() { | |
// Start workers | |
for i := 0; i < wp.numWorkers; i++ { | |
wp.wg.Add(1) | |
go wp.worker(i) | |
} | |
// Start result collector | |
go wp.resultCollector() | |
} | |
// worker processes tasks from the task queue | |
func (wp *WorkerPool) worker(id int) { | |
defer wp.wg.Done() | |
log.Printf("Worker %d started", id) | |
for { | |
select { | |
case <-wp.ctx.Done(): | |
log.Printf("Worker %d shutting down", id) | |
return | |
case task, ok := <-wp.taskQueue: | |
if !ok { | |
return | |
} | |
startTime := time.Now() | |
// Process the task | |
output, err := wp.processTask(task) | |
// Create result | |
result := Result{ | |
TaskID: task.ID, | |
Output: output, | |
Error: err, | |
ProcessedAt: time.Now(), | |
ProcessingTime: time.Since(startTime), | |
} | |
// Send result | |
select { | |
case wp.results <- result: | |
case <-wp.ctx.Done(): | |
return | |
} | |
} | |
} | |
} | |
// processTask handles the actual task processing | |
// This should be modified according to your specific needs | |
func (wp *WorkerPool) processTask(task Task) (interface{}, error) { | |
// Example processing logic | |
switch v := task.Data.(type) { | |
case string: | |
return fmt.Sprintf("Processed string: %s", v), nil | |
case int: | |
return v * 2, nil | |
default: | |
return nil, fmt.Errorf("unsupported data type for task %s", task.ID) | |
} | |
} | |
// resultCollector collects and handles results from workers | |
func (wp *WorkerPool) resultCollector() { | |
for result := range wp.results { | |
if result.Error != nil { | |
log.Printf("Error processing task %s: %v", result.TaskID, result.Error) | |
select { | |
case wp.errorChan <- result.Error: | |
default: | |
log.Printf("Error channel full, dropped error from task %s", result.TaskID) | |
} | |
} else { | |
log.Printf("Task %s completed successfully in %v", result.TaskID, result.ProcessingTime) | |
} | |
} | |
} | |
// SubmitTask adds a new task to the worker pool | |
func (wp *WorkerPool) SubmitTask(task Task) error { | |
select { | |
case <-wp.ctx.Done(): | |
return errors.New("worker pool is shutting down") | |
case wp.taskQueue <- task: | |
return nil | |
} | |
} | |
// Shutdown gracefully shuts down the worker pool | |
func (wp *WorkerPool) Shutdown() { | |
wp.cancel() // Signal all workers to stop | |
close(wp.taskQueue) | |
wp.wg.Wait() | |
close(wp.results) | |
close(wp.errorChan) | |
close(wp.done) | |
} | |
// BatchProcessor handles concurrent processing of batch operations | |
type BatchProcessor struct { | |
batchSize int | |
timeout time.Duration | |
wp *WorkerPool | |
} | |
// NewBatchProcessor creates a new batch processor | |
func NewBatchProcessor(batchSize int, timeout time.Duration, wp *WorkerPool) *BatchProcessor { | |
return &BatchProcessor{ | |
batchSize: batchSize, | |
timeout: timeout, | |
wp: wp, | |
} | |
} | |
// ProcessBatch processes a batch of items concurrently | |
func (bp *BatchProcessor) ProcessBatch(items []interface{}) ([]Result, error) { | |
results := make([]Result, 0, len(items)) | |
resultChan := make(chan Result, len(items)) | |
errChan := make(chan error, 1) | |
var wg sync.WaitGroup | |
// Process items in batches | |
for i := 0; i < len(items); i += bp.batchSize { | |
end := i + bp.batchSize | |
if end > len(items) { | |
end = len(items) | |
} | |
batch := items[i:end] | |
wg.Add(1) | |
go func(batch []interface{}) { | |
defer wg.Done() | |
for _, item := range batch { | |
task := Task{ | |
ID: fmt.Sprintf("task-%d", time.Now().UnixNano()), | |
Data: item, | |
CreatedAt: time.Now(), | |
} | |
if err := bp.wp.SubmitTask(task); err != nil { | |
errChan <- err | |
return | |
} | |
} | |
}(batch) | |
} | |
// Wait for all batches to complete or timeout | |
done := make(chan struct{}) | |
go func() { | |
wg.Wait() | |
close(done) | |
}() | |
select { | |
case <-done: | |
return results, nil | |
case err := <-errChan: | |
return nil, err | |
case <-time.After(bp.timeout): | |
return nil, errors.New("batch processing timed out") | |
} | |
} | |
// Example usage | |
func Example() { | |
// Create context with timeout | |
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | |
defer cancel() | |
// Create worker pool | |
wp := NewWorkerPool(ctx, 4, 100) | |
wp.Start() | |
defer wp.Shutdown() | |
// Create batch processor | |
bp := NewBatchProcessor(10, 30*time.Second, wp) | |
// Example items to process | |
items := []interface{}{ | |
"item1", | |
42, | |
"item3", | |
123, | |
} | |
// Process items in batch | |
results, err := bp.ProcessBatch(items) | |
if err != nil { | |
log.Printf("Batch processing error: %v", err) | |
return | |
} | |
// Handle results | |
for _, result := range results { | |
if result.Error != nil { | |
log.Printf("Error processing task %s: %v", result.TaskID, result.Error) | |
} else { | |
log.Printf("Task %s completed: %v", result.TaskID, result.Output) | |
} | |
} | |
} | |
// Example of rate-limited concurrent operations | |
func RateLimitedExample() { | |
// Create rate limiter channel | |
const rateLimit = 10 // operations per second | |
ticker := time.NewTicker(time.Second / rateLimit) | |
defer ticker.Stop() | |
// Create worker pool | |
ctx := context.Background() | |
wp := NewWorkerPool(ctx, 4, 100) | |
wp.Start() | |
defer wp.Shutdown() | |
// Process items with rate limiting | |
items := make([]string, 100) | |
for i := range items { | |
<-ticker.C // Wait for rate limiter | |
task := Task{ | |
ID: fmt.Sprintf("task-%d", i), | |
Data: fmt.Sprintf("item-%d", i), | |
CreatedAt: time.Now(), | |
} | |
if err := wp.SubmitTask(task); err != nil { | |
log.Printf("Error submitting task: %v", err) | |
continue | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment