Last active
December 27, 2018 09:51
-
-
Save giftig/02f216b2341e95c2854242c0b363d218 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
// Demo how to page through and group up matching rows in a source like a slick table, writing | |
// to some sink before proceeding to the next batch. Handles issues like matching entries being | |
// astride the boundary of pages or there being enough rows to group that it exceeds your standard | |
// page size | |
// | |
// N.B. to run, launch ammonite and $exec the script. Running directly with ammonite will cause | |
// problems due to a known bug in ammonite when running scripts with futures / awaits | |
val data = (1 to 100000).map { _ => Random.nextInt(500) }.sorted.toList | |
def batch(offset: Int, limit: Int): Future[List[Int]] = { | |
println(s"Taking batch [$offset:${offset + limit}]") | |
Future.successful(data.drop(offset).take(limit)) | |
} | |
def writeBatch(b: List[Int]): Future[Unit] = Future.successful { | |
println(s"Writing batch of ${b.length} entries from ${b.head} to ${b.last}...") | |
} | |
/** | |
* Get a batch where we can guarantee we have all duplicates | |
* | |
* We'll trim off the last entry if we have more than one in the list, or else we'll keep reading | |
* until we either run out of data or we find another unique entry | |
*/ | |
def computeBatch(offset: Int, batchSize: Int = 1000, acc: List[Int] = Nil): Future[List[Int]] = { | |
batch(offset, batchSize) flatMap { | |
case entries if entries.length == 0 => | |
Future.successful(acc) | |
case entries => | |
val combined = acc ++: entries | |
if (combined.head != combined.last) { | |
// Strip off the last unique entry to guarantee we've batched properly | |
Future.successful(combined takeWhile { _ != combined.last }) | |
} else { | |
// All identical; get another batch and try again | |
computeBatch(offset + entries.length, batchSize, combined) | |
} | |
} | |
} | |
def writeBatches(): Future[Unit] = { | |
def nextBatch( | |
offset: Int = 0, | |
limit: Int = 1000 | |
): Future[Unit] = { | |
computeBatch(offset, limit) flatMap { | |
case Nil => | |
println("Found empty batch; completing") | |
Future.unit | |
case entries => | |
writeBatch(entries) flatMap { _ => nextBatch(offset + entries.length) } | |
} | |
} | |
nextBatch() | |
} | |
writeBatches() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment