Created
January 25, 2017 08:23
-
-
Save thockin/1c05beb4075025798e9d242e082e4852 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
"k8s.io/client-go/pkg/util/flowcontrol" | |
"github.com/golang/glog" | |
) | |
// PeriodicRunner runs a function on a regular period. Callers can manually | |
// trigger runs, which will reset the period. Manual runs are rate-limited. | |
type PeriodicRunner struct { | |
minInterval time.Duration // the min time between runs, modulo bursts | |
maxInterval time.Duration // the max time between runs | |
mu sync.Mutex // guards runs of fn and all mutations | |
fn func() // function to run | |
lastRun time.Time // time since last run | |
periodicTimer *time.Timer // timer for periodic runs | |
limiter rateLimiter // rate limiter for on-demand runs | |
} | |
// designed so that flowcontrol.RateLimiter satisfies | |
type rateLimiter interface { | |
TryAccept() bool | |
Stop() | |
} | |
type nullLimiter struct{} | |
func (nullLimiter) TryAccept() bool { | |
return true | |
} | |
func (nullLimiter) Stop() {} | |
// NewPeriodicRunner creates a new PeriodicRunner. | |
func NewPeriodicRunner(fn func(), minInterval time.Duration, maxInterval time.Duration, burst int) *PeriodicRunner { | |
pr := &PeriodicRunner{ | |
fn: fn, | |
minInterval: minInterval, | |
maxInterval: maxInterval, | |
} | |
if minInterval == 0 { | |
pr.limiter = nullLimiter{} | |
} else { | |
// minInterval is a duration, typically in seconds but could be fractional | |
rps := float32(time.Second) / float32(minInterval) | |
// allow burst updates in short succession | |
pr.limiter = flowcontrol.NewTokenBucketRateLimiter(rps, burst) | |
} | |
return pr | |
} | |
// Run the periodic timer. This is expected to be called as a goroutine. | |
func (pr *PeriodicRunner) Loop(stop <-chan struct{}) { | |
pr.periodicTimer = time.NewTimer(pr.maxInterval) | |
for { | |
select { | |
case <-stop: | |
pr.stop() | |
return | |
case <-pr.periodicTimer.C: | |
pr.tick() | |
} | |
} | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) stop() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.limiter.Stop() | |
pr.periodicTimer.Stop() | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) tick() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.run() | |
pr.periodicTimer.Reset(pr.maxInterval) | |
} | |
// Run the function as soon as possible. If this is called while Loop is not | |
// running, the call may be deferred indefinitely. | |
func (pr *PeriodicRunner) Run() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.run() | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) run() { | |
if pr.limiter.TryAccept() { | |
pr.fn() | |
pr.lastRun = time.Now() | |
return | |
} | |
// It can't run right now, figure out when it can run next. | |
elapsed := time.Since(pr.lastRun) // how long since last run | |
asap := pr.minInterval - elapsed // time to next possible run | |
next := pr.maxInterval - elapsed // time to next periodic run | |
if next <= asap { | |
// just let the periodic timer catch it | |
glog.V(3).Infof("running too often: eta %v", next) | |
return | |
} | |
// set the on-demand timer for ASAP. This APPEARS to be safe to do while Loop() is reading the timer. | |
pr.periodicTimer.Stop() | |
pr.periodicTimer.Reset(asap) | |
glog.V(3).Infof("running too often: eta %v", asap) | |
} | |
func main() { | |
pr := NewPeriodicRunner(func() { glog.Errorf("RUN") }, 1*time.Second, 3*time.Second, 1) | |
stop := make(chan struct{}) | |
go pr.Loop(stop) | |
for { | |
x := "" | |
fmt.Scanln(&x) | |
pr.Run() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This version uses 2 timers and is (I think) most strictly correct. | |
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
"k8s.io/client-go/pkg/util/flowcontrol" | |
"github.com/golang/glog" | |
) | |
// PeriodicRunner runs a function on a regular period. Callers can manually | |
// trigger runs, which will reset the period. Manual runs are rate-limited. | |
type PeriodicRunner struct { | |
minInterval time.Duration // the min time between runs, modulo bursts | |
maxInterval time.Duration // the max time between runs | |
mu sync.Mutex // guards runs of fn and all mutations | |
fn func() // function to run | |
lastRun time.Time // time since last run | |
periodicTimer *time.Timer // timer for periodic runs | |
limiter rateLimiter // rate limiter for on-demand runs | |
pending bool // is there an on-demand run in flight? | |
demandTimer *time.Timer // timer for throttled on-demand runs | |
} | |
// designed so that flowcontrol.RateLimiter satisfies | |
type rateLimiter interface { | |
TryAccept() bool | |
Stop() | |
} | |
type nullLimiter struct{} | |
func (nullLimiter) TryAccept() bool { | |
return true | |
} | |
func (nullLimiter) Stop() {} | |
// NewPeriodicRunner creates a new PeriodicRunner. | |
func NewPeriodicRunner(fn func(), minInterval time.Duration, maxInterval time.Duration, burst int) *PeriodicRunner { | |
pr := &PeriodicRunner{ | |
fn: fn, | |
minInterval: minInterval, | |
maxInterval: maxInterval, | |
} | |
if minInterval == 0 { | |
pr.limiter = nullLimiter{} | |
} else { | |
// minInterval is a duration, typically in seconds but could be fractional | |
rps := float32(time.Second) / float32(minInterval) | |
// allow burst updates in short succession | |
pr.limiter = flowcontrol.NewTokenBucketRateLimiter(rps, burst) | |
} | |
return pr | |
} | |
// Run the periodic timer. This is expected to be called as a goroutine. | |
func (pr *PeriodicRunner) Loop(stop <-chan struct{}) { | |
pr.periodicTimer = time.NewTimer(pr.maxInterval) | |
for { | |
select { | |
case <-stop: | |
pr.stop() | |
return | |
case <-pr.periodicTimer.C: | |
pr.tick() | |
} | |
} | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) stop() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.limiter.Stop() | |
pr.periodicTimer.Stop() | |
if pr.demandTimer != nil { | |
pr.demandTimer.Stop() | |
} | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) tick() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.run() | |
pr.periodicTimer.Reset(pr.maxInterval) | |
} | |
// Run the function as soon as possible. If this is called while Loop is not | |
// running, the call may be deferred indefinitely. | |
func (pr *PeriodicRunner) Run() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.run() | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) run() { | |
if pr.limiter.TryAccept() { | |
pr.fn() | |
pr.lastRun = time.Now() | |
return | |
} | |
// It can't run right now, figure out when it can run next. | |
elapsed := time.Since(pr.lastRun) // how long since last run | |
asap := pr.minInterval - elapsed // time to next possible run | |
next := pr.maxInterval - elapsed // time to next periodic run | |
if pr.pending { | |
// there's already a timer pending | |
glog.V(3).Infof("running too often: eta %v", asap) | |
return | |
} | |
if next <= asap { | |
// just let the periodic timer catch it | |
glog.V(3).Infof("running too often: eta %v", next) | |
return | |
} | |
// set the on-demand timer for ASAP. | |
pr.onDemand(asap) | |
glog.V(3).Infof("running too often: eta %v", asap) | |
} | |
// assumes the lock is held | |
func (pr *PeriodicRunner) onDemand(asap time.Duration) { | |
pr.pending = true | |
pr.demandTimer = time.AfterFunc(asap, func() { | |
pr.mu.Lock() | |
defer pr.mu.Unlock() | |
pr.pending = false | |
pr.run() | |
}) | |
} | |
func main() { | |
pr := NewPeriodicRunner(func() { glog.Errorf("RUN") }, 1*time.Second, 3*time.Second, 2) | |
stop := make(chan struct{}) | |
go pr.Loop(stop) | |
for { | |
x := "" | |
fmt.Scanln(&x) | |
pr.Run() | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment