Created
January 10, 2024 15:56
-
-
Save emrekasg/b7fc3e82afe0bf5dc7dcbd75c7f18179 to your computer and use it in GitHub Desktop.
genericWorkers.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workers | |
import ( | |
"context" | |
"sync" | |
"time" | |
) | |
type WorkerFunc[T any] func(ctx context.Context, data T) | |
type WorkerContext[T any] struct { | |
ctx context.Context | |
cancel context.CancelFunc | |
workerFn WorkerFunc[T] | |
wg sync.WaitGroup | |
batchDuration time.Duration | |
data T | |
} | |
var ( | |
workers sync.Map | |
stopCh chan struct{} // Channel to signal stopping of Run | |
) | |
func init() { | |
stopCh = make(chan struct{}) | |
} | |
func RegisterPeriodically[T any](name string, duration time.Duration, data T, handler WorkerFunc[T]) { | |
ctx, cancel := context.WithCancel(context.Background()) | |
worker := &WorkerContext[T]{ | |
ctx: ctx, | |
cancel: cancel, | |
workerFn: handler, | |
batchDuration: duration, | |
data: data, | |
} | |
worker.wg.Add(1) | |
go worker.startPeriodicWorker() | |
workers.Store(name, worker) | |
} | |
func (w *WorkerContext[T]) startPeriodicWorker() { | |
defer w.wg.Done() | |
ticker := time.NewTicker(w.batchDuration) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-w.ctx.Done(): | |
return | |
case <-ticker.C: | |
w.workerFn(w.ctx, w.data) | |
} | |
} | |
} | |
func (w *WorkerContext[T]) stop() { | |
w.cancel() | |
w.wg.Wait() | |
} | |
func Stop(name string) { | |
if val, ok := workers.Load(name); ok { | |
worker := val.(*WorkerContext[interface{}]) | |
worker.stop() | |
workers.Delete(name) | |
} | |
} | |
func StopAll() { | |
workers.Range(func(_, value interface{}) bool { | |
worker := value.(*WorkerContext[interface{}]) | |
worker.stop() | |
return true | |
}) | |
close(stopCh) // Signal to unblock Run | |
} | |
func Run() { | |
<-stopCh // Block until StopAll is called | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment