Skip to content

Instantly share code, notes, and snippets.

@easel
Created September 16, 2015 23:40
Show Gist options
  • Save easel/440618498f892e35fbbd to your computer and use it in GitHub Desktop.
Save easel/440618498f892e35fbbd to your computer and use it in GitHub Desktop.
buffering and interleaving results
def streamByUserId(
userId: UserId,
meta: QueryMeta
): Future[StreamQueryResult[Contact]] = {
createFilteredStreamResult(queryByUserId(userId, Some(meta)), meta).map { result =>
var buffer: Seq[((ContactDTO, ContactPointDTO), ContactTypeDTO)] = Seq.empty
val reduced: Source[Contact, _] = result.stream.map { x =>
buffer.headOption match {
case Some(y) if y._1._1 == x._1._1 =>
buffer = buffer :+ x
None
case Some(y) =>
val contact = combine(buffer)
buffer = Seq(x)
contact
case None =>
buffer = Seq(x)
None
}
}.filter(_.isDefined).map(_.get)
val remainder: Source[Contact, _] = Source(() => combine(buffer).toIterator)
val combined = reduced ++ remainder
StreamQueryResult[Contact](result.totalCount, combined)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment