Created
October 2, 2020 18:14
-
-
Save LasTshaMAN/69a2f1c64d61c1174e20c00ed0569a3f to your computer and use it in GitHub Desktop.
Sliding window rate limiter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// We'll have a slot for each millisecond. | |
const slotCnt = int(time.Second / time.Millisecond) | |
type RateLimiter struct { | |
// rpsBufferMs is a ring buffer that contains a slot per millisecond. | |
rpsBufferMs [slotCnt]int | |
lastRpsValue int | |
lastSlot int | |
lastTime time.Time | |
maxAllowedRps int | |
} | |
func NewRateLimiter(maxAllowedRps int) RateLimiter { | |
return RateLimiter{ | |
maxAllowedRps: maxAllowedRps, | |
} | |
} | |
func (l *RateLimiter) CanTransact() bool { | |
nowTime := time.Now() | |
defer func() { | |
l.lastTime = nowTime | |
}() | |
currentSlot := findCurrentSlot(nowTime) | |
if nowTime.Sub(l.lastTime) >= 1 * time.Second { | |
// All the data in the buffer is irrelevant at this point, so clean up the buffer for re-use. | |
l.rpsBufferMs = [slotCnt]int{} | |
l.rpsBufferMs[currentSlot] = 1 | |
l.lastSlot = currentSlot | |
l.lastRpsValue = 0 | |
return true | |
} | |
// Special case when Transact is called multiple times during the same millisecond. | |
if currentSlot == l.lastSlot { | |
return l.lastRpsValue <= l.maxAllowedRps | |
} | |
if l.lastSlot < currentSlot { | |
// Reclaim rps from stale slots. | |
for i := l.lastSlot; i <= currentSlot; i++ { | |
l.lastRpsValue -= l.rpsBufferMs[i] | |
l.rpsBufferMs[i] = 0 | |
} | |
} else { | |
// Reclaim rps from stale slots. | |
for i := l.lastSlot; i <= slotCnt + currentSlot; i++ { | |
l.lastRpsValue -= l.rpsBufferMs[i % slotCnt] | |
l.rpsBufferMs[i % slotCnt] = 0 | |
} | |
} | |
l.lastSlot = currentSlot | |
return l.lastRpsValue <= l.maxAllowedRps | |
} | |
func (l *RateLimiter) Transact() { | |
nowTime := time.Now() | |
defer func() { | |
l.lastTime = nowTime | |
}() | |
currentSlot := findCurrentSlot(nowTime) | |
if nowTime.Sub(l.lastTime) >= 1 * time.Second { | |
// All the data in the buffer is irrelevant at this point, so clean up the buffer for re-use. | |
l.rpsBufferMs = [slotCnt]int{} | |
l.rpsBufferMs[currentSlot] = 1 | |
// Increasing rps is semantically the same is executing transaction here. | |
l.lastRpsValue = 1 | |
l.lastSlot = currentSlot | |
return | |
} | |
// Special case when Transact is called multiple times during the same millisecond. | |
if currentSlot == l.lastSlot { | |
l.rpsBufferMs[currentSlot]++ | |
// Increasing rps is semantically the same is executing transaction here. | |
l.lastRpsValue++ | |
// l.lastSlot is already the same as currentSlot. | |
return | |
} | |
if l.lastSlot < currentSlot { | |
// Reclaim rps from stale slots. | |
for i := l.lastSlot + 1; i <= currentSlot; i++ { | |
l.lastRpsValue -= l.rpsBufferMs[i] | |
l.rpsBufferMs[i] = 0 | |
} | |
} else { | |
// Reclaim rps from stale slots. | |
for i := l.lastSlot + 1; i <= slotCnt + currentSlot; i++ { | |
l.lastRpsValue -= l.rpsBufferMs[i % slotCnt] | |
l.rpsBufferMs[i % slotCnt] = 0 | |
} | |
} | |
l.rpsBufferMs[currentSlot]++ | |
// Increasing rps is semantically the same is executing transaction here. | |
l.lastRpsValue++ | |
l.lastSlot = currentSlot | |
} | |
func findCurrentSlot(now time.Time) int { | |
return int((time.Now().UnixNano() % int64(time.Second)) / int64(time.Millisecond)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment