Last active
July 4, 2016 15:36
-
-
Save nitsanw/72df7c3fea2be7353f3b316cf5236c91 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* trimmed down and reformatted for blog, see cassandra repo for actual code */ | |
/** | |
* Provides a 'next operation time' for rate limited operation streams. The rate limiter is thread safe and is to be | |
* shared by all consumer threads. | |
*/ | |
private static class UniformRateLimiter { | |
long start = Long.MIN_VALUE; | |
final long intervalNs; | |
final AtomicLong opIndex = new AtomicLong(); | |
UniformRateLimiter(int opsPerSec) { | |
intervalNs = 1000000000 / opsPerSec; // ooooh but rounding!?!? fuck it | |
} | |
// this is called when all load gen threads are ready to go | |
void start() { | |
start = System.nanoTime(); | |
} | |
/* @return intended start time in ns for the operation */ | |
long acquire(int partitionCount) { | |
long currOpIndex = opIndex.getAndAdd(partitionCount); | |
return start + currOpIndex * intervalNs; | |
} | |
} | |
/* Provides a blocking stream of operations per consumer thread */ | |
private static class StreamOfOperations { | |
private final OpDistribution operations; | |
private final UniformRateLimiter rateLimiter; | |
private final WorkManager workManager; | |
/* @return next op/null if no more ops coming. Blocks until next op is available. */ | |
Operation nextOp() { | |
Operation op = operations.next(); | |
final int partitionCount = op.ready(workManager); | |
if (partitionCount == 0) | |
return null; | |
if (rateLimiter != null) { // we've got a schedule! | |
long intendedTime = rateLimiter.acquire(partitionCount); | |
op.intendedStartNs(intendedTime); // set that fucker! | |
// park until time has come (subject to sleepNanos resolution) | |
long now; | |
while ((now = System.nanoTime()) < intendedTime) | |
LockSupport.parkNanos(intendedTime - now); // wake up randomly or when you tell me ;-) | |
} | |
return op; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment