Created
January 23, 2023 07:59
-
-
Save scottcagno/4a2dfadc45a03f8f7c2e73d11e3b3341 to your computer and use it in GitHub Desktop.
Poller.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
func main() { | |
poller := new(Poller) | |
poller.Start(time.Second * 5) | |
log.Println(">>> adding task...") | |
poller.AddTask("print_time", func() error { | |
fmt.Println("Task executed at", time.Now()) | |
return nil | |
}, 3, time.Second*10) | |
log.Println(">>> adding another task...") | |
time.Sleep(3 * time.Second) | |
poller.AddTask("print_time2", func() error { | |
fmt.Println("Task executed at", time.Now()) | |
return nil | |
}, 3, time.Second*10) | |
go func() { | |
for { | |
select { | |
case taskName := <-poller.TaskComplete(): | |
fmt.Println("Task completed:", taskName) | |
break | |
case err := <-poller.TaskError(): | |
fmt.Println("Task failed:", err) | |
break | |
} | |
} | |
}() | |
time.Sleep(time.Second * 20) | |
//log.Println(">>> removing task...") | |
//poller.RemoveTask("print_time") | |
log.Println(">>> changing interval...") | |
poller.ChangeInterval(time.Second * 1) | |
log.Println(">>> pausing for 20 seconds...") | |
poller.Pause() | |
time.Sleep(time.Second * 20) | |
log.Println(">>> resuming...") | |
poller.Resume() | |
time.Sleep(time.Second * 10) | |
poller.Stop() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
type Task struct { | |
Name string | |
Run func() error | |
NextRun time.Time | |
ConsecutiveF int | |
MaxRetries int | |
RetryDelay time.Duration | |
} | |
type Poller struct { | |
Ticker *time.Ticker | |
Interval time.Duration | |
Tasks []*Task | |
stop chan struct{} | |
stopped bool | |
paused bool | |
tasksMutex sync.RWMutex | |
taskComplete chan string | |
taskError chan error | |
} | |
func (p *Poller) Start(interval time.Duration) { | |
p.Ticker = time.NewTicker(interval) | |
p.stop = make(chan struct{}) | |
p.stopped = false | |
p.paused = false | |
p.taskComplete = make(chan string) | |
p.taskError = make(chan error) | |
go func() { | |
for { | |
select { | |
case <-p.stop: | |
p.Ticker.Stop() | |
p.stopped = true | |
return | |
case <-p.Ticker.C: | |
if !p.paused { | |
p.RunTasks() | |
} | |
} | |
} | |
}() | |
} | |
func (p *Poller) Stop() { | |
if !p.stopped { | |
close(p.stop) | |
} | |
} | |
func (p *Poller) RunTasks() { | |
now := time.Now() | |
p.tasksMutex.RLock() | |
for _, task := range p.Tasks { | |
if task.NextRun.Before(now) || task.NextRun.Equal(now) { | |
p.tasksMutex.RUnlock() | |
if err := task.Run(); err != nil { | |
p.taskError <- err | |
fmt.Printf("Error: %v", err) | |
task.ConsecutiveF++ | |
if task.ConsecutiveF >= task.MaxRetries { | |
p.RemoveTask(task.Name) | |
} else { | |
task.NextRun = now.Add(task.RetryDelay) | |
} | |
p.tasksMutex.RLock() | |
continue | |
} | |
task.ConsecutiveF = 0 | |
task.NextRun = now.Add(p.Interval) | |
p.taskComplete <- task.Name | |
p.tasksMutex.RLock() | |
} | |
} | |
p.tasksMutex.RUnlock() | |
if len(p.Tasks) == 0 { | |
p.Stop() | |
} | |
} | |
func (p *Poller) AddTask(name string, task func() error, maxRetries int, retryDelay time.Duration) { | |
p.tasksMutex.Lock() | |
defer p.tasksMutex.Unlock() | |
p.Tasks = append(p.Tasks, &Task{ | |
Name: name, | |
Run: task, | |
NextRun: time.Now(), | |
ConsecutiveF: 0, | |
MaxRetries: maxRetries, | |
RetryDelay: retryDelay, | |
}) | |
} | |
func (p *Poller) RemoveTask(name string) { | |
p.tasksMutex.Lock() | |
defer p.tasksMutex.Unlock() | |
for i, task := range p.Tasks { | |
if task.Name == name { | |
p.Tasks = append(p.Tasks[:i], p.Tasks[i+1:]...) | |
break | |
} | |
} | |
} | |
func (p *Poller) ChangeInterval(interval time.Duration) { | |
p.Stop() | |
p.Start(interval) | |
} | |
func (p *Poller) Pause() { | |
p.paused = true | |
} | |
func (p *Poller) Resume() { | |
p.paused = false | |
} | |
func (p *Poller) TaskComplete() chan string { | |
return p.taskComplete | |
} | |
func (p *Poller) TaskError() chan error { | |
return p.taskError | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment