This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spinoco.scalaz.stream | |
import fs2.Handle | |
import fs2._ | |
import scalaz.concurrent.{Actor, Task} | |
import scalaz.stream.{Cause, Process, wye} | |
import scala.language.higherKinds | |
import scalaz.{-\/, \/, \/-} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Asynchronous execution of this Process. Note that this method is not resource safe unless | |
* callback is called with _left_ side completed. In that case it is guaranteed that all cleanups | |
* has been successfully completed. | |
* User of this method is responsible for any cleanup actions to be performed by running the | |
* next Process obtained on right side of callback. | |
* | |
* This method returns a function, that when applied, causes the running computation to be interrupted. | |
* That is useful of process contains any asynchronous code, that may be left with incomplete callbacks. | |
* If the evaluation of the process is interrupted, then the interruption is only active if the callback |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Transforms a process to inputStream. Note that `close` here will cause process to be killed in case | |
* it did not finish yet. Otherwise cleanups may be called before `close` is executed on InputStream. | |
* Process will be run after first `read` invocation on InputStream. | |
* The resulting input stream is NOT thread safe. | |
* @param source | |
* @return | |
*/ | |
def toInputStream(source:TSource[ByteVector])(implicit S:Strategy):InputStream = { | |
val killSignal = async.signalOf(false) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream2 | |
import Process._ | |
import org.scalacheck.Prop._ | |
import org.scalacheck.Properties | |
import scalaz.concurrent.{Strategy, Task} | |
import scalaz.{\/, stream2, \/-, -\/} | |
import java.util.concurrent.atomic.AtomicInteger | |
import scala.concurrent.SyncVar | |
import scala.concurrent.duration._ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Feed a sequence of inputs to a `Process1`. */ | |
def feed[I, O](i: Seq[I])(p: Process1[I, O]): Process1[I, O] = { | |
@tailrec | |
def go( | |
in: Seq[I], out: Vector[O] | |
, cur: Process1[I, O] | |
, stack: Vector[Throwable => Trampoline[Process1[I, O]]] | |
): Process1[I, O] = { | |
if (in.nonEmpty) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream | |
import scalaz._ | |
import scalaz.concurrent.Task | |
import scala.annotation.tailrec | |
/** | |
* Created by pach on 01/03/14. | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spinoco | |
import scalaz.stream.{Exchange, process1, Process} | |
import scalaz.stream.Process._ | |
import scalaz.concurrent.Task | |
import scalaz.{-\/, \/-, \/} | |
/** | |
* Created by pach on 18/01/14. | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Topic that wrap state processor to produce | |
* - discrete stream of states that was created by publishing `A` | |
* - continuous stream of states that was created by publishing `A` | |
* - subscription to states and updates (S,A) | |
*/ | |
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) { | |
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal( | |
collect[(A or (S, A)), A]({ case a: A => a }) |> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz._ | |
import scalaz.stream.Process._ | |
import scalaz.stream.Process | |
import scalaz.stream.io | |
import scalaz.stream.process1 | |
import scala.util.matching.Regex | |
def matchRegex(pattern:Regex) : Process1[String,String] = | |
receive1[String,String]({ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream | |
import scalaz.stream.Process._ | |
import scalaz.stream.These.{This, That} | |
sealed trait ReceiveThese[+A,+B] | |
case class Receive[+A,+B](t:These[A,B]) extends ReceiveThese[A,B] |