Skip to content

Instantly share code, notes, and snippets.

@scottcagno
Created January 23, 2023 07:59
Show Gist options
  • Save scottcagno/4a2dfadc45a03f8f7c2e73d11e3b3341 to your computer and use it in GitHub Desktop.
Save scottcagno/4a2dfadc45a03f8f7c2e73d11e3b3341 to your computer and use it in GitHub Desktop.
Poller.go
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
poller := new(Poller)
poller.Start(time.Second * 5)
log.Println(">>> adding task...")
poller.AddTask("print_time", func() error {
fmt.Println("Task executed at", time.Now())
return nil
}, 3, time.Second*10)
log.Println(">>> adding another task...")
time.Sleep(3 * time.Second)
poller.AddTask("print_time2", func() error {
fmt.Println("Task executed at", time.Now())
return nil
}, 3, time.Second*10)
go func() {
for {
select {
case taskName := <-poller.TaskComplete():
fmt.Println("Task completed:", taskName)
break
case err := <-poller.TaskError():
fmt.Println("Task failed:", err)
break
}
}
}()
time.Sleep(time.Second * 20)
//log.Println(">>> removing task...")
//poller.RemoveTask("print_time")
log.Println(">>> changing interval...")
poller.ChangeInterval(time.Second * 1)
log.Println(">>> pausing for 20 seconds...")
poller.Pause()
time.Sleep(time.Second * 20)
log.Println(">>> resuming...")
poller.Resume()
time.Sleep(time.Second * 10)
poller.Stop()
}
package main
import (
"fmt"
"log"
"sync"
"time"
)
type Task struct {
Name string
Run func() error
NextRun time.Time
ConsecutiveF int
MaxRetries int
RetryDelay time.Duration
}
type Poller struct {
Ticker *time.Ticker
Interval time.Duration
Tasks []*Task
stop chan struct{}
stopped bool
paused bool
tasksMutex sync.RWMutex
taskComplete chan string
taskError chan error
}
func (p *Poller) Start(interval time.Duration) {
p.Ticker = time.NewTicker(interval)
p.stop = make(chan struct{})
p.stopped = false
p.paused = false
p.taskComplete = make(chan string)
p.taskError = make(chan error)
go func() {
for {
select {
case <-p.stop:
p.Ticker.Stop()
p.stopped = true
return
case <-p.Ticker.C:
if !p.paused {
p.RunTasks()
}
}
}
}()
}
func (p *Poller) Stop() {
if !p.stopped {
close(p.stop)
}
}
func (p *Poller) RunTasks() {
now := time.Now()
p.tasksMutex.RLock()
for _, task := range p.Tasks {
if task.NextRun.Before(now) || task.NextRun.Equal(now) {
p.tasksMutex.RUnlock()
if err := task.Run(); err != nil {
p.taskError <- err
fmt.Printf("Error: %v", err)
task.ConsecutiveF++
if task.ConsecutiveF >= task.MaxRetries {
p.RemoveTask(task.Name)
} else {
task.NextRun = now.Add(task.RetryDelay)
}
p.tasksMutex.RLock()
continue
}
task.ConsecutiveF = 0
task.NextRun = now.Add(p.Interval)
p.taskComplete <- task.Name
p.tasksMutex.RLock()
}
}
p.tasksMutex.RUnlock()
if len(p.Tasks) == 0 {
p.Stop()
}
}
func (p *Poller) AddTask(name string, task func() error, maxRetries int, retryDelay time.Duration) {
p.tasksMutex.Lock()
defer p.tasksMutex.Unlock()
p.Tasks = append(p.Tasks, &Task{
Name: name,
Run: task,
NextRun: time.Now(),
ConsecutiveF: 0,
MaxRetries: maxRetries,
RetryDelay: retryDelay,
})
}
func (p *Poller) RemoveTask(name string) {
p.tasksMutex.Lock()
defer p.tasksMutex.Unlock()
for i, task := range p.Tasks {
if task.Name == name {
p.Tasks = append(p.Tasks[:i], p.Tasks[i+1:]...)
break
}
}
}
func (p *Poller) ChangeInterval(interval time.Duration) {
p.Stop()
p.Start(interval)
}
func (p *Poller) Pause() {
p.paused = true
}
func (p *Poller) Resume() {
p.paused = false
}
func (p *Poller) TaskComplete() chan string {
return p.taskComplete
}
func (p *Poller) TaskError() chan error {
return p.taskError
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment