Skip to content

Instantly share code, notes, and snippets.

@leifwickland
Created November 17, 2015 16:21
Show Gist options
  • Save leifwickland/285a6950293a49d07a36 to your computer and use it in GitHub Desktop.
Save leifwickland/285a6950293a49d07a36 to your computer and use it in GitHub Desktop.
scalaz.stream.chunkUpToNBy2
import scalaz.stream.{ Process, Process1 }
object SnapshotRowsToDruidQuery {
/**
* Break the stream into a chunks of up to `n` elements as long as `f` returns true.
*/
def chunkUpToNBy2[I](n: Int, f: (I, I) => Boolean): Process1[I, Vector[I]] = {
require(n > 0, s"chunk size, `n`, must be > 0. It was $n.")
def newChunk(startingWith: I): Process1[I, Vector[I]] = {
go(Vector(startingWith), startingWith)
}
def go(acc: Vector[I], previous: I): Process1[I, Vector[I]] = {
if (acc.length >= n) {
Process.emit(acc).append(
Process.receive1Or[I, Vector[I]](Process.empty)(newChunk)
)
} else {
Process.receive1Or[I, Vector[I]](if (acc.nonEmpty) Process.emit(acc) else Process.empty) { i =>
if (f(previous, i)) go(acc :+ i, i)
else Process.emit(acc).append(newChunk(i))
}
}
}
Process.receive1(newChunk)
}
}
import scalaz.stream.Process
class SnapshotRowsToDruidQueryTest extends WordSpecBase {
"chunkUpToNBy2" should {
"handle empty input" in {
chunk[String](1, Nil, constTrue) shouldBe Nil
}
"split on size" in {
chunk(2, 1 to 5, constTrue) shouldBe Seq(Seq(1, 2), Seq(3, 4), Seq(5))
}
"split on function" in {
chunk[Int](3, 1 to 5, (a, b) => b % 2 == 0) shouldBe Seq(Seq(1, 2), Seq(3, 4), Seq(5))
}
"split on size and function" in {
val actual = chunk[Int](4, 10 to 20, (a, b) => !(a % 5 == 0 && a % 3 == 0))
actual shouldBe Seq(Seq(10, 11, 12, 13), Seq(14, 15), Seq(16, 17, 18, 19), Seq(20))
}
"allow the predicate to view the previous and current" in {
val actual = chunk[Int](2, Seq(1, 1, 1, 2, 2, 3, 2, 3, 3, 3), (a, b) => a == b)
actual shouldBe Seq(Seq(1, 1), Seq(1), Seq(2, 2), Seq(3), Seq(2), Seq(3, 3), Seq(3))
}
}
def chunk[I](n: Int, s: Seq[I], f: (I, I) => Boolean): IndexedSeq[Vector[I]] = {
Process.emitAll(s).pipe(SnapshotRowsToDruidQuery.chunkUpToNBy2(n, f)).toSource.runLog.run
}
def constTrue[I](a: I, b: I): Boolean = true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment