Skip to content

Instantly share code, notes, and snippets.

View pchlupacek's full-sized avatar

Pavel Chlupacek pchlupacek

View GitHub Profile
@pchlupacek
pchlupacek / fs2Conversion.scala
Last active April 22, 2017 04:57
converting fs2 to scalaz.stream and vice versa
package spinoco.scalaz.stream
import fs2.Handle
import fs2._
import scalaz.concurrent.{Actor, Task}
import scalaz.stream.{Cause, Process, wye}
import scala.language.higherKinds
import scalaz.{-\/, \/, \/-}
/**
* Asynchronous execution of this Process. Note that this method is not resource safe unless
* callback is called with _left_ side completed. In that case it is guaranteed that all cleanups
* has been successfully completed.
* User of this method is responsible for any cleanup actions to be performed by running the
* next Process obtained on right side of callback.
*
* This method returns a function, that when applied, causes the running computation to be interrupted.
* That is useful of process contains any asynchronous code, that may be left with incomplete callbacks.
* If the evaluation of the process is interrupted, then the interruption is only active if the callback
/**
* Transforms a process to inputStream. Note that `close` here will cause process to be killed in case
* it did not finish yet. Otherwise cleanups may be called before `close` is executed on InputStream.
* Process will be run after first `read` invocation on InputStream.
* The resulting input stream is NOT thread safe.
* @param source
* @return
*/
def toInputStream(source:TSource[ByteVector])(implicit S:Strategy):InputStream = {
val killSignal = async.signalOf(false)
@pchlupacek
pchlupacek / ExperimentSpec.scala
Last active August 29, 2015 14:01
issues with object
package scalaz.stream2
import Process._
import org.scalacheck.Prop._
import org.scalacheck.Properties
import scalaz.concurrent.{Strategy, Task}
import scalaz.{\/, stream2, \/-, -\/}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.SyncVar
import scala.concurrent.duration._
/** Feed a sequence of inputs to a `Process1`. */
def feed[I, O](i: Seq[I])(p: Process1[I, O]): Process1[I, O] = {
@tailrec
def go(
in: Seq[I], out: Vector[O]
, cur: Process1[I, O]
, stack: Vector[Throwable => Trampoline[Process1[I, O]]]
): Process1[I, O] = {
if (in.nonEmpty) {
@pchlupacek
pchlupacek / gist:9298516
Created March 1, 2014 22:32
Process2 with Append
package scalaz.stream
import scalaz._
import scalaz.concurrent.Task
import scala.annotation.tailrec
/**
* Created by pach on 01/03/14.
*/
package com.spinoco
import scalaz.stream.{Exchange, process1, Process}
import scalaz.stream.Process._
import scalaz.concurrent.Task
import scalaz.{-\/, \/-, \/}
/**
* Created by pach on 18/01/14.
*/
@pchlupacek
pchlupacek / gist:7826399
Last active December 30, 2015 11:59
StateTopic
/**
* Topic that wrap state processor to produce
* - discrete stream of states that was created by publishing `A`
* - continuous stream of states that was created by publishing `A`
* - subscription to states and updates (S,A)
*/
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) {
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal(
collect[(A or (S, A)), A]({ case a: A => a }) |>
@pchlupacek
pchlupacek / gist:7813959
Last active December 30, 2015 10:09
file parse example with scalaz-stream
import scalaz._
import scalaz.stream.Process._
import scalaz.stream.Process
import scalaz.stream.io
import scalaz.stream.process1
import scala.util.matching.Regex
def matchRegex(pattern:Regex) : Process1[String,String] =
receive1[String,String]({
package scalaz.stream
import scalaz.stream.Process._
import scalaz.stream.These.{This, That}
sealed trait ReceiveThese[+A,+B]
case class Receive[+A,+B](t:These[A,B]) extends ReceiveThese[A,B]