-
-
Save kubukoz/03039cd2fd03f252f9d5a7df4fefbc85 to your computer and use it in GitHub Desktop.
/* | |
Copyright 2021 Jakub Kozłowski | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
*/ | |
import cats.effect.MonadThrow | |
import cats.implicits._ | |
import fs2.Pull | |
import fs2.Stream | |
//Example usage: onEmptyOrNonEmpty(stream)(Ok(_))(NotFound()).flatten | |
def onEmptyOrNonEmpty[F[_]: MonadThrow, A, B]( | |
stream: Stream[F, A] | |
)( | |
onNonEmpty: Stream[F, A] => B | |
)( | |
onEmpty: B | |
)( | |
implicit SC: fs2.Compiler[F, F] | |
): F[B] = stream | |
.pull | |
.peek1 | |
.flatMap { | |
_.traverse_ { case (h, t) => | |
Pull.output1(onNonEmpty((Stream.emit(h) ++ t))) | |
} | |
} | |
.stream | |
.compile | |
.last | |
.map(_.getOrElse(onEmpty)) |
Why exactly the initial version needed the extendScopeTo
? Looks like peek1
(which is implemented in terms of uncons1
) is not doing any of this.
It's very hard to find any resources about fs2 scopes and how they work so will appreciate if you have some resources to share on this topic :)
@msosnicki uh, that's a great catch. Apparently peek1 doesn't do what we need here (I admit, I haven't ran that later code).
Basically, the idea is that if you just pull.uncons1
and compile the stream that results from that (which will contain the rest of the stream in the value - this is our B
that we built from onNonEmpty
), the resources of the stream will be closed when that effect completes - which may (and probably will) be before you even stream anything to the caller.
Extending the scope lets us explicitly say "don't close this stream yet" and pass the lifetime of all these resources to the "inner" stream (the one returned in the result: B
). At least that's how I understand it (the original version worked like that).
Now that I think of it, there might be some gotchas related to cancellation/failure - if the outer stream is cancelled/fails after the scope is extended (although it shouldn't be able to fail after a successful uncons1
, so cancellation is more likely), there's a chance the resources won't get closed... I'll do due research and testing and maybe post something longer about this.
Ok, that makes sense! So if I understand correctly, it is required if for whatever reason the inner stream compilation happens after the peek1.stream
compilation. Here it's the case because in http4s Response
this stream is retained and compiled later on. But if I wanted to use the same combinator with a B
that compiles and consumes the inner stream internally, I think it wouldn't be required, as it wouldn't leak outside.
Anyway, looking forward to some longer post about it :) Thanks!
Might take some time :D
I think if we didn't have a compile
that essentially runs before the response body, we wouldn't need to touch scopes, yeah. B
can't consume the stream internally now, because it's no longer in F[B]
(of course you can pass an effect, but the result will be F[F[...]]
so it's a separate "compilation" and needs scope extension)
@kubukoz I'm sure people will copy this, might make sense to throw a license file up, so them doing so is alright.
Good idea, thanks :)
Note: as @ChristopherDavenport reminded me, there's a
.pull.peek1
which handles the scope extension already, so we can simplify that part:Bonus point: We don't really need to stick to
Response[F]
:and then we can push the
flatten
ing of theF[B]
part to the user, because why not (they'll just need to.flatten
on the whole result):Final result: