Created
November 17, 2015 16:21
-
-
Save leifwickland/285a6950293a49d07a36 to your computer and use it in GitHub Desktop.
scalaz.stream.chunkUpToNBy2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz.stream.{ Process, Process1 } | |
object SnapshotRowsToDruidQuery { | |
/** | |
* Break the stream into a chunks of up to `n` elements as long as `f` returns true. | |
*/ | |
def chunkUpToNBy2[I](n: Int, f: (I, I) => Boolean): Process1[I, Vector[I]] = { | |
require(n > 0, s"chunk size, `n`, must be > 0. It was $n.") | |
def newChunk(startingWith: I): Process1[I, Vector[I]] = { | |
go(Vector(startingWith), startingWith) | |
} | |
def go(acc: Vector[I], previous: I): Process1[I, Vector[I]] = { | |
if (acc.length >= n) { | |
Process.emit(acc).append( | |
Process.receive1Or[I, Vector[I]](Process.empty)(newChunk) | |
) | |
} else { | |
Process.receive1Or[I, Vector[I]](if (acc.nonEmpty) Process.emit(acc) else Process.empty) { i => | |
if (f(previous, i)) go(acc :+ i, i) | |
else Process.emit(acc).append(newChunk(i)) | |
} | |
} | |
} | |
Process.receive1(newChunk) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz.stream.Process | |
class SnapshotRowsToDruidQueryTest extends WordSpecBase { | |
"chunkUpToNBy2" should { | |
"handle empty input" in { | |
chunk[String](1, Nil, constTrue) shouldBe Nil | |
} | |
"split on size" in { | |
chunk(2, 1 to 5, constTrue) shouldBe Seq(Seq(1, 2), Seq(3, 4), Seq(5)) | |
} | |
"split on function" in { | |
chunk[Int](3, 1 to 5, (a, b) => b % 2 == 0) shouldBe Seq(Seq(1, 2), Seq(3, 4), Seq(5)) | |
} | |
"split on size and function" in { | |
val actual = chunk[Int](4, 10 to 20, (a, b) => !(a % 5 == 0 && a % 3 == 0)) | |
actual shouldBe Seq(Seq(10, 11, 12, 13), Seq(14, 15), Seq(16, 17, 18, 19), Seq(20)) | |
} | |
"allow the predicate to view the previous and current" in { | |
val actual = chunk[Int](2, Seq(1, 1, 1, 2, 2, 3, 2, 3, 3, 3), (a, b) => a == b) | |
actual shouldBe Seq(Seq(1, 1), Seq(1), Seq(2, 2), Seq(3), Seq(2), Seq(3, 3), Seq(3)) | |
} | |
} | |
def chunk[I](n: Int, s: Seq[I], f: (I, I) => Boolean): IndexedSeq[Vector[I]] = { | |
Process.emitAll(s).pipe(SnapshotRowsToDruidQuery.chunkUpToNBy2(n, f)).toSource.runLog.run | |
} | |
def constTrue[I](a: I, b: I): Boolean = true | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment