Skip to content

Instantly share code, notes, and snippets.

@emrekasg
Created January 10, 2024 14:56
Show Gist options
  • Save emrekasg/3a9282c262339c00b74bbd7894f37058 to your computer and use it in GitHub Desktop.
Save emrekasg/3a9282c262339c00b74bbd7894f37058 to your computer and use it in GitHub Desktop.
tickerWorkers.go
package workers
import (
"context"
"fmt"
"sync"
"time"
)
// WorkerElement represents information about a worker.
type WorkerElement struct {
Name string `json:"name"`
Duration time.Duration `json:"duration"`
}
var (
workers = make(map[string]*WorkerContext)
workersList []WorkerElement
mu sync.Mutex
)
type WorkerContext struct {
ctx context.Context
cancel context.CancelFunc
workerFn func(ctx context.Context)
wg sync.WaitGroup
shutdownOnce sync.Once
shutdownChan chan struct{}
batchDuration time.Duration
}
// Register adds a worker function.
func Register(name string, handler func(ctx context.Context)) {
worker := &WorkerContext{
ctx: context.Background(),
shutdownChan: make(chan struct{}),
}
worker.start(name, handler)
}
// RegisterPeriodically adds a worker function that runs periodically.
func RegisterPeriodically(name string, duration time.Duration, handler func(ctx context.Context)) {
worker := &WorkerContext{
ctx: context.Background(),
shutdownChan: make(chan struct{}),
batchDuration: duration,
}
worker.startPeriodic(name, handler)
}
// Stop stops a worker by name.
func Stop(name string) {
mu.Lock()
defer mu.Unlock()
if worker, ok := workers[name]; ok {
worker.stop()
delete(workers, name)
fmt.Println("Stopping worker:", name)
} else {
fmt.Println("Worker not found:", name)
}
}
// Run starts all registered workers and runs indefinitely.
func Run() {
select {}
}
func (w *WorkerContext) start(name string, handler func(ctx context.Context)) {
w.workerFn = func(ctx context.Context) {
for {
handler(ctx)
select {
case <-ctx.Done():
return
default:
}
}
}
w.startWorker(name)
}
func (w *WorkerContext) startPeriodic(name string, handler func(ctx context.Context)) {
w.workerFn = func(ctx context.Context) {
ticker := time.NewTicker(w.batchDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
handler(ctx)
case <-ctx.Done():
return
}
}
}
w.startWorker(name)
}
func (w *WorkerContext) startWorker(name string) {
mu.Lock()
workers[name] = w
mu.Unlock()
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.workerFn(w.ctx)
}()
}
func (w *WorkerContext) stop() {
w.shutdownOnce.Do(func() {
close(w.shutdownChan)
w.cancel()
w.wg.Wait()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment