Skip to content

Instantly share code, notes, and snippets.

@valencik
Last active March 1, 2023 01:40
Show Gist options
  • Save valencik/5fe3cd114e19a34c00583564ea72402e to your computer and use it in GitHub Desktop.
Save valencik/5fe3cd114e19a34c00583564ea72402e to your computer and use it in GitHub Desktop.
<li class="indented0" name="fs2.Stream.InvariantOps#observe" group="Ungrouped" fullcomment="yes" data-isabs="false" visbl="pub">
<a id="observe(p:fs2.Pipe[F,O,Nothing])(implicitF:cats.effect.Concurrent[F]):fs2.Stream[F,O]" class="anchorToMember"></a><a id="observe(Pipe[F,O,Nothing])(Concurrent[F]):Stream[F,O]" class="anchorToMember"></a>
<span class="permalink">
<a href="../fs2/Stream$$InvariantOps.html#observe(p:fs2.Pipe[F,O,Nothing])(implicitF:cats.effect.Concurrent[F]):fs2.Stream[F,O]" title="Permalink"><i class="material-icons"></i></a>
</span>
<span class="modifier_kind"><span class="modifier"></span> <span class="kind">def</span></span>
<span class="symbol">
<span class="name">observe</span>
<span class="params">
(
<span name="p">
p: <a href="index.html#Pipe[F[_],-I,+O]=fs2.Stream[F,I]=>fs2.Stream[F,O]" name="fs2.Pipe" id="fs2.Pipe" class="extmbr">Pipe</a>[<span name="fs2.Stream.InvariantOps.F" class="extype">F</span>,
<span name="fs2.Stream.InvariantOps.O" class="extype">O</span>, <a href="https://www.scala-lang.org/api/2.13.10/scala/Nothing.html#scala.Nothing" name="scala.Nothing" id="scala.Nothing" class="extype">Nothing</a>]
</span>
)
</span>
<span class="params">
(<span class="implicit">implicit </span><span name="F">F: <span name="cats.effect.Concurrent" class="extype">Concurrent</span>[<span name="fs2.Stream.InvariantOps.F" class="extype">F</span>]</span>)
</span>
<span class="result">
: <a href="Stream.html" name="fs2.Stream" id="fs2.Stream" class="extype">Stream</a>[<span name="fs2.Stream.InvariantOps.F" class="extype">F</span>, <span name="fs2.Stream.InvariantOps.O" class="extype">O</span>]
</span>
</span>
<p class="shortcomment cmt">Synchronously sends values through <code>p</code>.</p>
<div class="fullcomment">
<div class="comment cmt">
<p>Synchronously sends values through <code>p</code>.</p>
<p>If <code>p</code> fails, then resulting stream will fail. If <code>p</code> halts the evaluation will halt too.</p>
<p>
Note that observe will only output full chunks of <code>O</code> that are known to be successfully processed by <code>p</code>. So if <code>p</code> terminates/fails in the middle of chunk processing, the chunk will not be
available in resulting stream.
</p>
<p>Note that if your pipe can be represented by an <code>O =&gt; F[Unit]</code>, <code>evalTap</code> will provide much greater performance.</p>
</div>
<dl class="attributes block">
<div class="block">
Example:
<ol>
<li class="cmt">
<p></p>
<pre>scala&gt; <span class="kw">import</span> cats.effect.IO, cats.effect.unsafe.implicits.global
scala&gt; Stream(<span class="num">1</span>, <span class="num">2</span>, <span class="num">3</span>).covary[IO].observe(_.printlns).map(_ + <span class="num">1</span>).compile.toVector.unsafeRunSync()
res0: Vector[<span class="std">Int</span>] = Vector(<span class="num">2</span>, <span class="num">3</span>, <span class="num">4</span>)</pre>
</li>
</ol>
</div>
</dl>
</div>
</li>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment