Created
July 17, 2015 18:03
-
-
Save randomstatistic/b8b05efcc156a349b0d8 to your computer and use it in GitHub Desktop.
Block simultaneous future creation beyond a threshold
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} | |
import scala.annotation.tailrec | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.Future | |
/** | |
* Thread-safe lock on Future generation. The put() method accepts futures without blocking so long as there are less than | |
* $size futures that have been added via put() that are still alive. If more than $size futures are still running, | |
* calling put() *blocks* the calling thread until some of the current futures finish. | |
* It's highly recommended that futures provided to put() have some kind of timeout. | |
*/ | |
class FutureBuffer[T](size: Int) { | |
private val bq = new LinkedBlockingQueue[Future[T]](size) | |
// A snapshot of the futures currently running | |
def inFlight() = bq.iterator().asScala | |
// Blocks iff there are $size futures currently in flight | |
@tailrec | |
final def put(f: Future[T], delayMs: Int = 0): Boolean = { | |
val accepted = | |
if (delayMs == 0) { | |
bq.offer(f) // this offer API is less work | |
} else { | |
bq.offer(f, delayMs, TimeUnit.MILLISECONDS) | |
} | |
accepted match { | |
case true => | |
true | |
case false => | |
clean() // try to make room | |
put(f, 50) // try again with an offer delay so we don't spin cycle too fast | |
} | |
} | |
private def clean() = { | |
inFlight().foreach(f => | |
if (f.isCompleted) bq.remove(f) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment