Skip to content

Instantly share code, notes, and snippets.

@Pythonista7
Last active January 14, 2023 14:17
Show Gist options
  • Save Pythonista7/dc41befdf8b233f1907c304031357869 to your computer and use it in GitHub Desktop.
Save Pythonista7/dc41befdf8b233f1907c304031357869 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"go.uber.org/ratelimit"
)
const dateFormatTemplate = "01-02-2006 15:04:05"
var delayList = [...]int{100, 200, 500, 1000}
type RLCtr struct {
// can add other fields as needed for identifying/used in a worker task type
rl ratelimit.Limiter
history uint32
targetRPM uint32
}
func main() {
var wg sync.WaitGroup
queue := make(chan (string), 25)
wg.Add(2)
go producer(&wg, queue)
go consumer(&wg, queue)
wg.Wait()
}
func producer(wg *sync.WaitGroup, queue chan (string)) {
ticker := time.NewTicker(50 * time.Millisecond)
done := make(chan bool)
fmt.Println("Started producer")
go func() {
for {
select {
case <-done:
wg.Done()
return
case t := <-ticker.C:
queue <- "Custom Payload: " + t.Format(dateFormatTemplate)
if len(queue) > 0 && len(queue)%10 == 0 {
fmt.Printf("Buffer = %d/%d\n", len(queue), cap(queue))
}
}
}
}()
}
func consumer(wg *sync.WaitGroup, queue chan (string)) {
const targetRPM float64 = 100
targetRPS := math.Ceil(float64(targetRPM / 60))
rlc := &RLCtr{
rl: ratelimit.New(int(targetRPS)), // param is rps
history: 0,
targetRPM: uint32(targetRPM),
}
go func() {
defer wg.Done()
makeReqs(rlc, queue, targetRPS)
}()
}
func makeReqs(rlc *RLCtr, queue chan (string), targetRPS float64) {
rlc.start(targetRPS)
for range queue {
// in case we hit the per min rate limit too soon , we wait around until we can resume
for rlc.history >= rlc.targetRPM {
fmt.Printf(
"Reached RPM waiting for history refresh: {history : %d , allowedRPM: %d }\n",
rlc.history, rlc.targetRPM,
)
time.Sleep(1 * time.Second)
}
now := rlc.rl.Take()
fmt.Println(now.Format(dateFormatTemplate))
atomic.AddUint32(&rlc.history, 1) // add to history
go work()
}
}
func work() {
// do any network call / other job here
delay := delayList[rand.Intn(len(delayList))]
time.Sleep(time.Duration(delay) * time.Millisecond)
}
func (rlc *RLCtr) start(targetRPS float64) {
window := 5
// optional logger ticker just for understanding
go func() {
ticker := time.NewTicker(time.Duration(window) * time.Second)
prev := rlc.history
for range ticker.C {
var counter uint32
if rlc.history > prev {
counter = rlc.history - prev
} else {
counter = 0
}
fmt.Printf(
"Completed %d tasks in the last window(%d seconds) , targetRPS = %f , currentRPS = %d , completedTaskCount = %d \n",
counter, window, targetRPS, counter/uint32(window), rlc.history)
prev = rlc.history
}
}()
// essential, this resets history and helps stay within RPMLimit
go func() {
monitor := time.NewTicker(1 * time.Minute)
for range monitor.C {
fmt.Printf(
"Period Stats: noOfReqSent = %d , rateLimit(per min) = %d \n",
rlc.history, rlc.targetRPM,
)
atomic.StoreUint32(&rlc.history, 0)
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment