Created
January 10, 2024 14:56
-
-
Save emrekasg/3a9282c262339c00b74bbd7894f37058 to your computer and use it in GitHub Desktop.
tickerWorkers.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workers | |
import ( | |
"context" | |
"fmt" | |
"sync" | |
"time" | |
) | |
// WorkerElement represents information about a worker. | |
type WorkerElement struct { | |
Name string `json:"name"` | |
Duration time.Duration `json:"duration"` | |
} | |
var ( | |
workers = make(map[string]*WorkerContext) | |
workersList []WorkerElement | |
mu sync.Mutex | |
) | |
type WorkerContext struct { | |
ctx context.Context | |
cancel context.CancelFunc | |
workerFn func(ctx context.Context) | |
wg sync.WaitGroup | |
shutdownOnce sync.Once | |
shutdownChan chan struct{} | |
batchDuration time.Duration | |
} | |
// Register adds a worker function. | |
func Register(name string, handler func(ctx context.Context)) { | |
worker := &WorkerContext{ | |
ctx: context.Background(), | |
shutdownChan: make(chan struct{}), | |
} | |
worker.start(name, handler) | |
} | |
// RegisterPeriodically adds a worker function that runs periodically. | |
func RegisterPeriodically(name string, duration time.Duration, handler func(ctx context.Context)) { | |
worker := &WorkerContext{ | |
ctx: context.Background(), | |
shutdownChan: make(chan struct{}), | |
batchDuration: duration, | |
} | |
worker.startPeriodic(name, handler) | |
} | |
// Stop stops a worker by name. | |
func Stop(name string) { | |
mu.Lock() | |
defer mu.Unlock() | |
if worker, ok := workers[name]; ok { | |
worker.stop() | |
delete(workers, name) | |
fmt.Println("Stopping worker:", name) | |
} else { | |
fmt.Println("Worker not found:", name) | |
} | |
} | |
// Run starts all registered workers and runs indefinitely. | |
func Run() { | |
select {} | |
} | |
func (w *WorkerContext) start(name string, handler func(ctx context.Context)) { | |
w.workerFn = func(ctx context.Context) { | |
for { | |
handler(ctx) | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
} | |
} | |
} | |
w.startWorker(name) | |
} | |
func (w *WorkerContext) startPeriodic(name string, handler func(ctx context.Context)) { | |
w.workerFn = func(ctx context.Context) { | |
ticker := time.NewTicker(w.batchDuration) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ticker.C: | |
handler(ctx) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
w.startWorker(name) | |
} | |
func (w *WorkerContext) startWorker(name string) { | |
mu.Lock() | |
workers[name] = w | |
mu.Unlock() | |
w.wg.Add(1) | |
go func() { | |
defer w.wg.Done() | |
w.workerFn(w.ctx) | |
}() | |
} | |
func (w *WorkerContext) stop() { | |
w.shutdownOnce.Do(func() { | |
close(w.shutdownChan) | |
w.cancel() | |
w.wg.Wait() | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment