Last active
January 14, 2023 14:17
-
-
Save Pythonista7/dc41befdf8b233f1907c304031357869 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math" | |
"math/rand" | |
"sync" | |
"sync/atomic" | |
"time" | |
"go.uber.org/ratelimit" | |
) | |
const dateFormatTemplate = "01-02-2006 15:04:05" | |
var delayList = [...]int{100, 200, 500, 1000} | |
type RLCtr struct { | |
// can add other fields as needed for identifying/used in a worker task type | |
rl ratelimit.Limiter | |
history uint32 | |
targetRPM uint32 | |
} | |
func main() { | |
var wg sync.WaitGroup | |
queue := make(chan (string), 25) | |
wg.Add(2) | |
go producer(&wg, queue) | |
go consumer(&wg, queue) | |
wg.Wait() | |
} | |
func producer(wg *sync.WaitGroup, queue chan (string)) { | |
ticker := time.NewTicker(50 * time.Millisecond) | |
done := make(chan bool) | |
fmt.Println("Started producer") | |
go func() { | |
for { | |
select { | |
case <-done: | |
wg.Done() | |
return | |
case t := <-ticker.C: | |
queue <- "Custom Payload: " + t.Format(dateFormatTemplate) | |
if len(queue) > 0 && len(queue)%10 == 0 { | |
fmt.Printf("Buffer = %d/%d\n", len(queue), cap(queue)) | |
} | |
} | |
} | |
}() | |
} | |
func consumer(wg *sync.WaitGroup, queue chan (string)) { | |
const targetRPM float64 = 100 | |
targetRPS := math.Ceil(float64(targetRPM / 60)) | |
rlc := &RLCtr{ | |
rl: ratelimit.New(int(targetRPS)), // param is rps | |
history: 0, | |
targetRPM: uint32(targetRPM), | |
} | |
go func() { | |
defer wg.Done() | |
makeReqs(rlc, queue, targetRPS) | |
}() | |
} | |
func makeReqs(rlc *RLCtr, queue chan (string), targetRPS float64) { | |
rlc.start(targetRPS) | |
for range queue { | |
// in case we hit the per min rate limit too soon , we wait around until we can resume | |
for rlc.history >= rlc.targetRPM { | |
fmt.Printf( | |
"Reached RPM waiting for history refresh: {history : %d , allowedRPM: %d }\n", | |
rlc.history, rlc.targetRPM, | |
) | |
time.Sleep(1 * time.Second) | |
} | |
now := rlc.rl.Take() | |
fmt.Println(now.Format(dateFormatTemplate)) | |
atomic.AddUint32(&rlc.history, 1) // add to history | |
go work() | |
} | |
} | |
func work() { | |
// do any network call / other job here | |
delay := delayList[rand.Intn(len(delayList))] | |
time.Sleep(time.Duration(delay) * time.Millisecond) | |
} | |
func (rlc *RLCtr) start(targetRPS float64) { | |
window := 5 | |
// optional logger ticker just for understanding | |
go func() { | |
ticker := time.NewTicker(time.Duration(window) * time.Second) | |
prev := rlc.history | |
for range ticker.C { | |
var counter uint32 | |
if rlc.history > prev { | |
counter = rlc.history - prev | |
} else { | |
counter = 0 | |
} | |
fmt.Printf( | |
"Completed %d tasks in the last window(%d seconds) , targetRPS = %f , currentRPS = %d , completedTaskCount = %d \n", | |
counter, window, targetRPS, counter/uint32(window), rlc.history) | |
prev = rlc.history | |
} | |
}() | |
// essential, this resets history and helps stay within RPMLimit | |
go func() { | |
monitor := time.NewTicker(1 * time.Minute) | |
for range monitor.C { | |
fmt.Printf( | |
"Period Stats: noOfReqSent = %d , rateLimit(per min) = %d \n", | |
rlc.history, rlc.targetRPM, | |
) | |
atomic.StoreUint32(&rlc.history, 0) | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment