This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Transforms a process to inputStream. Note that `close` here will cause process to be killed in case | |
* it did not finish yet. Otherwise cleanups may be called before `close` is executed on InputStream. | |
* Process will be run after first `read` invocation on InputStream. | |
* The resulting input stream is NOT thread safe. | |
* @param source | |
* @return | |
*/ | |
def toInputStream(source:TSource[ByteVector])(implicit S:Strategy):InputStream = { | |
val killSignal = async.signalOf(false) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Asynchronous execution of this Process. Note that this method is not resource safe unless | |
* callback is called with _left_ side completed. In that case it is guaranteed that all cleanups | |
* has been successfully completed. | |
* User of this method is responsible for any cleanup actions to be performed by running the | |
* next Process obtained on right side of callback. | |
* | |
* This method returns a function, that when applied, causes the running computation to be interrupted. | |
* That is useful of process contains any asynchronous code, that may be left with incomplete callbacks. | |
* If the evaluation of the process is interrupted, then the interruption is only active if the callback |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spinoco.scalaz.stream | |
import fs2.Handle | |
import fs2._ | |
import scalaz.concurrent.{Actor, Task} | |
import scalaz.stream.{Cause, Process, wye} | |
import scala.language.higherKinds | |
import scalaz.{-\/, \/, \/-} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Runs supplied process1 `process` in scope of resulting pipe. | |
*/ | |
def liftProcess1[F[_],I,O](process:Process1[I,O]):Pipe[F,I,O] = { | |
def go(p1:Process1[I,O]):Handle[F,I] => Pull[F,O,Unit] = { | |
_.receive1 { | |
case (evt,h) => p1.feed1(evt).unemit match { | |
case (out, np1) => np1 match { | |
case Halt(End | Kill) => Pull.output(Chunk.seq(out)) | |
case Halt(Error(rsn)) => Pull.output(Chunk.seq(out)) >> Pull.fail(rsn) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> jmh:run -i 3 -wi 3 -f1 -t1 -gc true -jvmArgs "-Xmx4G -XX:+UseG1GC" .RealTimeBenchmark | |
[info] Compiling 1 Scala source to /Users/pach/work/repo/n/scalaz-stream/benchmark/target/scala-2.12/classes... | |
Processing 262 classes from /Users/pach/work/repo/n/scalaz-stream/benchmark/target/scala-2.12/classes with "reflection" generator | |
Writing out Java source to /Users/pach/work/repo/n/scalaz-stream/benchmark/target/scala-2.12/src_managed/jmh and resources to /Users/pach/work/repo/n/scalaz-stream/benchmark/target/scala-2.12/resource_managed/jmh | |
[info] Compiling 4 Java sources to /Users/pach/work/repo/n/scalaz-stream/benchmark/target/scala-2.12/classes... | |
[info] Compiling 34 Java sources to /Users/pach/work/repo/n/scalaz-stream/benchmark/target/scala-2.12/classes... | |
[info] Running org.openjdk.jmh.Main -i 3 -wi 3 -f1 -t1 -gc true -jvmArgs -Xmx4G -XX:+UseG1GC .RealTimeBenchmark | |
[info] # JMH 1.14.1 (released 94 days ago) | |
[info] # VM version: JDK 1.8.0_40, VM 25.40-b25 | |
[info] # VM invoker: /Library/Java/JavaVirtualMachin |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fs2.benchmark | |
import java.util.concurrent.CountDownLatch | |
import java.util.concurrent.atomic.AtomicInteger | |
import fs2.{Scope => _, _} | |
import org.openjdk.jmh.annotations._ | |
import org.openjdk.jmh.infra.Blackhole | |
import scala.concurrent.duration._ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub enum Trampoline<A> { | |
Return(A) | |
, Suspend(Box<Fn() -> Trampoline<A>>) | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// source of uris, may be for example file, or another http server, db ... | |
val sourceOfUris: Stream[F, Uri] = ??? | |
// max requests to process in parallel | |
val maxConcurrent: Int = ??? | |
val responses : Stream[F, Stream[F, HttpResponse[F]]] = | |
http.client[Task]().flatMap { client => | |
sourceOfUris map { uri => client.request(HttpRequest.get[F](uri))} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val refSync = async.syncRefOf[IO, Long](0l).unsafeRunSync() | |
val refAsync = async.refOf[IO, Long](0l).unsafeRunSync() | |
val count = 10000000 | |
def time[A](s: String)(f: IO[A]): A = { | |
val start = System.currentTimeMillis() | |
val a = f.unsafeRunSync() | |
val took = System.currentTimeMillis() - start | |
println(s"Execution of : $s took ${took.toFloat/1000} s") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software |