Skip to content

Instantly share code, notes, and snippets.

@kanterov
Last active August 29, 2015 14:26
Show Gist options
  • Save kanterov/c7eab5c9f5d4604d15ea to your computer and use it in GitHub Desktop.
Save kanterov/c7eab5c9f5d4604d15ea to your computer and use it in GitHub Desktop.
scalaz-stream pipeIn
// doesn't print "release" unless you remove `.pipeIn(process1.id[A])`,
// internally `pipeIn` uses `takeWhile` which uses `pipe`, it breaks print too
def printSink[A] = io.stdOutLines.contramap[A](_.toString).pipeIn(process1.id[A])
// simplier case doesn't work too:
// sink.lift[Task, A](x => Task.delay(())).pipe(process1.id[A => Task[Unit]])
def disposableThing: Process[Task, Unit] = {
val acquire = Task.delay {println("acquire")}
val release = Task.delay {println("release")}
io.resource(acquire)(x => release)(Task.now)
}
val poison = async.signalOf[Boolean](false)
new Thread {
override def run(): Unit = {
Thread.sleep(3000)
poison.set(true).run
}
}.start()
poison.discrete.wye(
for {
thing <- disposableThing
y <- Process.emit(1).observe(printSink)
} yield y
)(wye.interrupt).run.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment