Skip to content

Instantly share code, notes, and snippets.

@nitsanw
Last active July 4, 2016 15:36
Show Gist options
  • Save nitsanw/72df7c3fea2be7353f3b316cf5236c91 to your computer and use it in GitHub Desktop.
Save nitsanw/72df7c3fea2be7353f3b316cf5236c91 to your computer and use it in GitHub Desktop.
/* trimmed down and reformatted for blog, see cassandra repo for actual code */
/**
* Provides a 'next operation time' for rate limited operation streams. The rate limiter is thread safe and is to be
* shared by all consumer threads.
*/
private static class UniformRateLimiter {
long start = Long.MIN_VALUE;
final long intervalNs;
final AtomicLong opIndex = new AtomicLong();
UniformRateLimiter(int opsPerSec) {
intervalNs = 1000000000 / opsPerSec; // ooooh but rounding!?!? fuck it
}
// this is called when all load gen threads are ready to go
void start() {
start = System.nanoTime();
}
/* @return intended start time in ns for the operation */
long acquire(int partitionCount) {
long currOpIndex = opIndex.getAndAdd(partitionCount);
return start + currOpIndex * intervalNs;
}
}
/* Provides a blocking stream of operations per consumer thread */
private static class StreamOfOperations {
private final OpDistribution operations;
private final UniformRateLimiter rateLimiter;
private final WorkManager workManager;
/* @return next op/null if no more ops coming. Blocks until next op is available. */
Operation nextOp() {
Operation op = operations.next();
final int partitionCount = op.ready(workManager);
if (partitionCount == 0)
return null;
if (rateLimiter != null) { // we've got a schedule!
long intendedTime = rateLimiter.acquire(partitionCount);
op.intendedStartNs(intendedTime); // set that fucker!
// park until time has come (subject to sleepNanos resolution)
long now;
while ((now = System.nanoTime()) < intendedTime)
LockSupport.parkNanos(intendedTime - now); // wake up randomly or when you tell me ;-)
}
return op;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment