Skip to content

Instantly share code, notes, and snippets.

@thockin
Created January 25, 2017 08:23
Show Gist options
  • Save thockin/1c05beb4075025798e9d242e082e4852 to your computer and use it in GitHub Desktop.
Save thockin/1c05beb4075025798e9d242e082e4852 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"sync"
"time"
"k8s.io/client-go/pkg/util/flowcontrol"
"github.com/golang/glog"
)
// PeriodicRunner runs a function on a regular period. Callers can manually
// trigger runs, which will reset the period. Manual runs are rate-limited.
type PeriodicRunner struct {
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time since last run
periodicTimer *time.Timer // timer for periodic runs
limiter rateLimiter // rate limiter for on-demand runs
}
// designed so that flowcontrol.RateLimiter satisfies
type rateLimiter interface {
TryAccept() bool
Stop()
}
type nullLimiter struct{}
func (nullLimiter) TryAccept() bool {
return true
}
func (nullLimiter) Stop() {}
// NewPeriodicRunner creates a new PeriodicRunner.
func NewPeriodicRunner(fn func(), minInterval time.Duration, maxInterval time.Duration, burst int) *PeriodicRunner {
pr := &PeriodicRunner{
fn: fn,
minInterval: minInterval,
maxInterval: maxInterval,
}
if minInterval == 0 {
pr.limiter = nullLimiter{}
} else {
// minInterval is a duration, typically in seconds but could be fractional
rps := float32(time.Second) / float32(minInterval)
// allow burst updates in short succession
pr.limiter = flowcontrol.NewTokenBucketRateLimiter(rps, burst)
}
return pr
}
// Run the periodic timer. This is expected to be called as a goroutine.
func (pr *PeriodicRunner) Loop(stop <-chan struct{}) {
pr.periodicTimer = time.NewTimer(pr.maxInterval)
for {
select {
case <-stop:
pr.stop()
return
case <-pr.periodicTimer.C:
pr.tick()
}
}
}
// assumes the lock is held
func (pr *PeriodicRunner) stop() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.limiter.Stop()
pr.periodicTimer.Stop()
}
// assumes the lock is held
func (pr *PeriodicRunner) tick() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.run()
pr.periodicTimer.Reset(pr.maxInterval)
}
// Run the function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
func (pr *PeriodicRunner) Run() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.run()
}
// assumes the lock is held
func (pr *PeriodicRunner) run() {
if pr.limiter.TryAccept() {
pr.fn()
pr.lastRun = time.Now()
return
}
// It can't run right now, figure out when it can run next.
elapsed := time.Since(pr.lastRun) // how long since last run
asap := pr.minInterval - elapsed // time to next possible run
next := pr.maxInterval - elapsed // time to next periodic run
if next <= asap {
// just let the periodic timer catch it
glog.V(3).Infof("running too often: eta %v", next)
return
}
// set the on-demand timer for ASAP. This APPEARS to be safe to do while Loop() is reading the timer.
pr.periodicTimer.Stop()
pr.periodicTimer.Reset(asap)
glog.V(3).Infof("running too often: eta %v", asap)
}
func main() {
pr := NewPeriodicRunner(func() { glog.Errorf("RUN") }, 1*time.Second, 3*time.Second, 1)
stop := make(chan struct{})
go pr.Loop(stop)
for {
x := ""
fmt.Scanln(&x)
pr.Run()
}
}
// This version uses 2 timers and is (I think) most strictly correct.
package main
import (
"fmt"
"sync"
"time"
"k8s.io/client-go/pkg/util/flowcontrol"
"github.com/golang/glog"
)
// PeriodicRunner runs a function on a regular period. Callers can manually
// trigger runs, which will reset the period. Manual runs are rate-limited.
type PeriodicRunner struct {
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time since last run
periodicTimer *time.Timer // timer for periodic runs
limiter rateLimiter // rate limiter for on-demand runs
pending bool // is there an on-demand run in flight?
demandTimer *time.Timer // timer for throttled on-demand runs
}
// designed so that flowcontrol.RateLimiter satisfies
type rateLimiter interface {
TryAccept() bool
Stop()
}
type nullLimiter struct{}
func (nullLimiter) TryAccept() bool {
return true
}
func (nullLimiter) Stop() {}
// NewPeriodicRunner creates a new PeriodicRunner.
func NewPeriodicRunner(fn func(), minInterval time.Duration, maxInterval time.Duration, burst int) *PeriodicRunner {
pr := &PeriodicRunner{
fn: fn,
minInterval: minInterval,
maxInterval: maxInterval,
}
if minInterval == 0 {
pr.limiter = nullLimiter{}
} else {
// minInterval is a duration, typically in seconds but could be fractional
rps := float32(time.Second) / float32(minInterval)
// allow burst updates in short succession
pr.limiter = flowcontrol.NewTokenBucketRateLimiter(rps, burst)
}
return pr
}
// Run the periodic timer. This is expected to be called as a goroutine.
func (pr *PeriodicRunner) Loop(stop <-chan struct{}) {
pr.periodicTimer = time.NewTimer(pr.maxInterval)
for {
select {
case <-stop:
pr.stop()
return
case <-pr.periodicTimer.C:
pr.tick()
}
}
}
// assumes the lock is held
func (pr *PeriodicRunner) stop() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.limiter.Stop()
pr.periodicTimer.Stop()
if pr.demandTimer != nil {
pr.demandTimer.Stop()
}
}
// assumes the lock is held
func (pr *PeriodicRunner) tick() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.run()
pr.periodicTimer.Reset(pr.maxInterval)
}
// Run the function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
func (pr *PeriodicRunner) Run() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.run()
}
// assumes the lock is held
func (pr *PeriodicRunner) run() {
if pr.limiter.TryAccept() {
pr.fn()
pr.lastRun = time.Now()
return
}
// It can't run right now, figure out when it can run next.
elapsed := time.Since(pr.lastRun) // how long since last run
asap := pr.minInterval - elapsed // time to next possible run
next := pr.maxInterval - elapsed // time to next periodic run
if pr.pending {
// there's already a timer pending
glog.V(3).Infof("running too often: eta %v", asap)
return
}
if next <= asap {
// just let the periodic timer catch it
glog.V(3).Infof("running too often: eta %v", next)
return
}
// set the on-demand timer for ASAP.
pr.onDemand(asap)
glog.V(3).Infof("running too often: eta %v", asap)
}
// assumes the lock is held
func (pr *PeriodicRunner) onDemand(asap time.Duration) {
pr.pending = true
pr.demandTimer = time.AfterFunc(asap, func() {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.pending = false
pr.run()
})
}
func main() {
pr := NewPeriodicRunner(func() { glog.Errorf("RUN") }, 1*time.Second, 3*time.Second, 2)
stop := make(chan struct{})
go pr.Loop(stop)
for {
x := ""
fmt.Scanln(&x)
pr.Run()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment