Skip to content

Instantly share code, notes, and snippets.

@emrekasg
Created January 10, 2024 15:56
Show Gist options
  • Save emrekasg/b7fc3e82afe0bf5dc7dcbd75c7f18179 to your computer and use it in GitHub Desktop.
Save emrekasg/b7fc3e82afe0bf5dc7dcbd75c7f18179 to your computer and use it in GitHub Desktop.
genericWorkers.go
package workers
import (
"context"
"sync"
"time"
)
type WorkerFunc[T any] func(ctx context.Context, data T)
type WorkerContext[T any] struct {
ctx context.Context
cancel context.CancelFunc
workerFn WorkerFunc[T]
wg sync.WaitGroup
batchDuration time.Duration
data T
}
var (
workers sync.Map
stopCh chan struct{} // Channel to signal stopping of Run
)
func init() {
stopCh = make(chan struct{})
}
func RegisterPeriodically[T any](name string, duration time.Duration, data T, handler WorkerFunc[T]) {
ctx, cancel := context.WithCancel(context.Background())
worker := &WorkerContext[T]{
ctx: ctx,
cancel: cancel,
workerFn: handler,
batchDuration: duration,
data: data,
}
worker.wg.Add(1)
go worker.startPeriodicWorker()
workers.Store(name, worker)
}
func (w *WorkerContext[T]) startPeriodicWorker() {
defer w.wg.Done()
ticker := time.NewTicker(w.batchDuration)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
w.workerFn(w.ctx, w.data)
}
}
}
func (w *WorkerContext[T]) stop() {
w.cancel()
w.wg.Wait()
}
func Stop(name string) {
if val, ok := workers.Load(name); ok {
worker := val.(*WorkerContext[interface{}])
worker.stop()
workers.Delete(name)
}
}
func StopAll() {
workers.Range(func(_, value interface{}) bool {
worker := value.(*WorkerContext[interface{}])
worker.stop()
return true
})
close(stopCh) // Signal to unblock Run
}
func Run() {
<-stopCh // Block until StopAll is called
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment