Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created January 21, 2016 01:20
Show Gist options
  • Save pchiusano/999c84f7d32e9db67ed6 to your computer and use it in GitHub Desktop.
Save pchiusano/999c84f7d32e9db67ed6 to your computer and use it in GitHub Desktop.
Implementation of liftFirst for FS2
def liftFirst[F[_],A,B,C](f: Process1[A,B]): Process1[(A,C),(B,C)] = {
def go(cs: Chunk[C], s: Stepper[A,B]): Handle[Pure,(A,C)] => Pull[Pure,(B,C),Unit] = h =>
s.step match {
case Stepper.Done => Pull.done
case Stepper.Fail(err) => Pull.fail(err)
case Stepper.Emits(chunk, next) =>
if (cs.isEmpty) go(cs, next)(h)
else {
val last = cs(cs.size - 1)
val extended =
if (cs.size < chunk.size) cs.toVector ++ Vector.fill(chunk.size - cs.size)(last)
else cs.toVector
val zipped = Chunk.indexedSeq { chunk.toVector zip extended }
Pull.output(zipped) >> go(Chunk.singleton(last), next)(h)
}
case Stepper.Await(recv) => Pull.suspend {
Pull.awaitOption(h).flatMap {
case None => go(Chunk.empty, recv(None))(Handle.empty)
case Some(s) => go(s.head.map(_._2), recv(Some(s.head.map(_._1))))(s.tail)
}
}
}
_ pull { h => go(Chunk.empty, process1.stepper(f))(h) }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment