Last active
July 10, 2017 23:11
-
-
Save NicolasRouquette/68e6dec26f5b2752612b42efc163d6fa to your computer and use it in GitHub Desktop.
http4s / fs2 version of Cafe examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer,System} | |
import fs2._ | |
import scala.{App,Long,StringContext} | |
object Main1 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder = Task { | |
c.getOrder() | |
} | |
// A process that repeatedly ask for an order | |
val askForOrders = Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders = askForOrders.take(norders) | |
// Run and collect the results for print out. | |
val orderingResult = orders.runLog.unsafeRun() | |
orderingResult.foreach(order => System.out.println(s"order: $order")) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer,System} | |
import fs2._ | |
import scala.{App,Long,StringContext} | |
object Main2 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder = Task { | |
c.getOrder() | |
} | |
// A process that repeatedly ask for an order | |
val askForOrders = Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders = askForOrders.take(norders) | |
// Splitting the order means extracting out the drinks. | |
val drinksOnly = orders.map(_.items) | |
// Run and collect the results for print out. | |
val orderingResult = drinksOnly.runLog.unsafeRun() | |
orderingResult.foreach(order => System.out.println(s"order: $order")) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer, System} | |
import fs2._ | |
import scalaz._ | |
import scala.collection.immutable.Seq | |
import scala.{App, Long, StringContext} | |
import scala.Predef.String | |
object Main3 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder | |
: Task[Order] | |
= Task { c.getOrder() } | |
// A process that repeatedly ask for an order | |
val askForOrders | |
: Stream[Task, Order] | |
= Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders | |
: Stream[Task, Order] | |
= askForOrders.take(norders) | |
// Splitting the order means extracting out the drinks. | |
val drinksOnly | |
: Stream[Task, Seq[Item]] | |
= orders.map(_.items) | |
// Based on "iced", map to left or right. Use \/ (disjunction). | |
// Each drink order is broken out individually. | |
val coldOrHotIndividualDrink | |
: Stream[Task, \/[Item, Item]] | |
= drinksOnly.flatMap { items => | |
Stream.emits(items.map { item => | |
if (item.iced) -\/(item) | |
else \/-(item) | |
}) | |
} | |
val printer | |
: Pipe[Task, \/[Item,Item], String] | |
= { in => | |
in.map { | |
case -\/(i) => | |
s"Left: $i" | |
case \/-(i) => | |
s"Right: $i" | |
} | |
} | |
val items | |
: Stream[Task, String] | |
= coldOrHotIndividualDrink.flatMap(ii => printer(Stream.emit(ii))) | |
val waiter | |
: Sink[Task, String] | |
= { in => | |
in.map { message => | |
System.out.println(message) | |
() | |
} | |
} | |
// Run and collect the results for print out. | |
val orderingResult = items to waiter | |
orderingResult.run.unsafeRun() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer, System} | |
import fs2._ | |
import scalaz._ | |
import scala.collection.immutable.Seq | |
import scala.{App, Long, StringContext} | |
import scala.Predef.String | |
object Main4 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder | |
: Task[Order] | |
= Task { c.getOrder() } | |
// A process that repeatedly ask for an order | |
val askForOrders | |
: Stream[Task, Order] | |
= Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders | |
: Stream[Task, Order] | |
= askForOrders.take(norders) | |
// Splitting the order means extracting out the drinks. | |
val drinksOnly | |
: Stream[Task, Seq[Item]] | |
= orders.map(_.items) | |
// Based on "iced", map to left or right. Use \/ (disjunction). | |
// Each drink order is broken out individually. | |
val coldOrHotIndividualDrink | |
: Pipe[Task, Item, \/[Item, Item]] | |
= { in => | |
in.map { item => | |
if (item.iced) -\/(item) | |
else \/-(item) | |
} | |
} | |
val printer | |
: Pipe[Task, \/[Item,Item], String] | |
= { in => | |
in.map { | |
case -\/(i) => | |
s"Left: $i" | |
case \/-(i) => | |
s"Right: $i" | |
} | |
} | |
val items | |
: Stream[Task, String] | |
= drinksOnly.flatMap(Stream.emits).through(coldOrHotIndividualDrink).flatMap(ii => printer(Stream.emit(ii))) | |
val waiter | |
: Sink[Task, String] | |
= { in => | |
in.map { message => | |
System.out.println(message) | |
() | |
} | |
} | |
// Run and collect the results for print out. | |
val orderingResult = items to waiter | |
orderingResult.run.unsafeRun() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer, System} | |
import fs2._ | |
import scalaz._ | |
import scala.collection.immutable.Seq | |
import scala.{App, Long, StringContext} | |
import scala.Predef.String | |
object Main5 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder | |
: Task[Order] | |
= Task { c.getOrder() } | |
// A process that repeatedly ask for an order | |
val askForOrders | |
: Stream[Task, Order] | |
= Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders | |
: Stream[Task, Order] | |
= askForOrders.take(norders) | |
// Splitting the order means extracting out the drinks. | |
val drinksOnly | |
: Pipe[Task, Order, Seq[Item]] | |
= { in => | |
in.map { o => | |
o.items | |
} | |
} | |
// Based on "iced", map to left or right. Use \/ (disjunction). | |
// Each drink order is broken out individually. | |
val coldOrHotIndividualDrink | |
: Pipe[Task, Item, \/[Item, Item]] | |
= { in => | |
in.map { item => | |
if (item.iced) -\/(item) | |
else \/-(item) | |
} | |
} | |
val printer | |
: Pipe[Task, \/[Item,Item], String] | |
= { in => | |
in.map { | |
case -\/(i) => | |
s"Left: $i" | |
case \/-(i) => | |
s"Right: $i" | |
} | |
} | |
val items | |
: Stream[Task, String] | |
= orders | |
.through(drinksOnly) | |
.flatMap(Stream.emits) | |
.through(coldOrHotIndividualDrink) | |
.flatMap(ii => printer(Stream.emit(ii))) | |
val waiter | |
: Sink[Task, String] | |
= { in => | |
in.map { message => | |
System.out.println(message) | |
() | |
} | |
} | |
// Run and collect the results for print out. | |
val orderingResult = items to waiter | |
orderingResult.run.unsafeRun() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer, System} | |
import fs2._ | |
import scalaz._ | |
import scala.collection.immutable.Seq | |
import scala.{App, Long, StringContext} | |
import scala.Predef.String | |
object Main6 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder | |
: Task[Order] | |
= Task { c.getOrder() } | |
// A process that repeatedly ask for an order | |
val askForOrders | |
: Stream[Task, Order] | |
= Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders | |
: Stream[Task, Order] | |
= askForOrders.take(norders) | |
// Splitting the order means extracting out the drinks. | |
val drinksOnly | |
: Pipe[Task, Order, Seq[Item]] | |
= { in => | |
in.map { o => | |
o.items | |
} | |
} | |
// Based on "iced", map to left or right. Use \/ (disjunction). | |
// Each drink order is broken out individually. | |
val coldOrHotIndividualDrink | |
: Pipe[Task, Item, \/[Item, Item]] | |
= { in => | |
in.map { item => | |
if (item.iced) -\/(item) | |
else \/-(item) | |
} | |
} | |
def createDrinks(barista: Barista) | |
: Pipe[Task, \/[Item, Item], Drink] | |
= { in => in.map { | |
case -\/(item) => | |
barista.prepareColdDrink(item) | |
case \/-(item) => | |
barista.prepareHotDrink(item) | |
}} | |
val printer | |
: Pipe[Task, Drink, String] | |
= { in => | |
in.map { _.toString } | |
} | |
val items | |
: Stream[Task, String] | |
= orders | |
.through(drinksOnly) | |
.flatMap(Stream.emits) | |
.through(coldOrHotIndividualDrink) | |
.through(createDrinks(Barista())) | |
.flatMap(ii => printer(Stream.emit(ii))) | |
val waiter | |
: Sink[Task, String] | |
= { in => | |
in.map { message => | |
System.out.println(message) | |
() | |
} | |
} | |
// Run and collect the results for print out. | |
val orderingResult = items to waiter | |
orderingResult.run.unsafeRun() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cafe | |
import java.lang.{Integer, System, Thread} | |
import fs2._ | |
import fs2.async._ | |
import scala.collection.immutable.Seq | |
import scala.{App, Int, Long, StringContext, Unit} | |
import scala.Predef.String | |
object Main7 extends App { | |
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") | |
val norders: Long = Integer.parseInt(args(0)).toLong | |
val hotQueueLength: Int = Integer.parseInt(args(1)) | |
val coldQueueLength: Int = Integer.parseInt(args(2)) | |
val c = Cafe() | |
System.out.println(s"Cafe ${c.name} opened for business") | |
// An asynchronous task that asks for a single order in | |
// case getOrder() does always not return quickly. | |
val askForOrder | |
: Task[Order] | |
= Task { | |
val o = c.getOrder() | |
System.out.println(s"""${Thread.currentThread.getName} asking for $o""") | |
o | |
} | |
// A process that repeatedly ask for an order | |
val askForOrders | |
: Stream[Task, Order] | |
= Stream.repeatEval(askForOrder) | |
// The process that asks for N orders. Normally the ordering | |
// process would end when the Cafe closes. This process | |
// enters a Halt(Cause.End) state automatically after the take() reaches | |
// its limit. | |
val orders | |
: Stream[Task, Order] | |
= askForOrders.take(norders) | |
// Splitting the order means extracting out the drinks. | |
val drinksOnly | |
: Pipe[Task, Order, Seq[Item]] | |
= { in => | |
in.map { o => | |
o.items | |
} | |
} | |
val hotDrinkQueue | |
: Task[mutable.Queue[Task, Item]] | |
= boundedQueue[Task, Item](maxSize = hotQueueLength) | |
val coldDrinkQueue | |
: Task[mutable.Queue[Task, Item]] | |
= boundedQueue[Task, Item](maxSize = coldQueueLength) | |
// Based on "iced", map to left or right. Use \/ (disjunction). | |
// Each drink order is broken out individually. | |
def coldOrHotIndividualDrink | |
(hq: mutable.Queue[Task, Item], | |
cq: mutable.Queue[Task, Item]) | |
: Pipe[Task, Item, Unit] | |
= { in => | |
in.map { item => | |
if (item.iced) | |
cq.enqueue1(item).unsafeRun() | |
else | |
hq.enqueue1(item).unsafeRun() | |
} | |
} | |
def processHotDrinks(barista: Barista) | |
: Pipe[Task, Item, Drink] | |
= { _.map(barista.prepareHotDrink) } | |
def processColdDrinks(barista: Barista) | |
: Pipe[Task, Item, Drink] | |
= { _.map(barista.prepareColdDrink) } | |
def hotBaristaWorker | |
(q: mutable.Queue[Task, Item], | |
barista: Barista) | |
: Stream[Task, Drink] | |
= q.size.discrete.flatMap { _ => | |
q.dequeue.map { item => | |
barista.prepareHotDrink(item) | |
} | |
} | |
def coldBaristaWorker | |
(q: mutable.Queue[Task, Item], | |
barista: Barista) | |
: Stream[Task, Drink] | |
= q.size.discrete.flatMap { _ => | |
q.dequeue.map { item => | |
barista.prepareColdDrink(item) | |
} | |
} | |
val joinDrinkQueue | |
: Task[mutable.Queue[Task, Drink]] | |
= boundedQueue[Task, Drink](maxSize = 3) | |
def joinBaristaWorker | |
(s: Stream[Task, Drink], | |
j: mutable.Queue[Task, Drink]) | |
: Stream[Task, Unit] | |
= s.to(j.enqueue) | |
val printer | |
: Pipe[Task, Drink, String] | |
= { in => | |
in.map { _.toString } | |
} | |
def items | |
(hq: mutable.Queue[Task, Item], | |
cq: mutable.Queue[Task, Item]) | |
: Stream[Task, Unit] | |
= orders | |
.through(drinksOnly) | |
.flatMap(Stream.emits) | |
.through(coldOrHotIndividualDrink(hq, cq)) | |
def waiter | |
(j: mutable.Queue[Task, Drink]) | |
: Stream[Task, Unit] | |
= j.size.discrete.flatMap { _ => | |
j.dequeue.map { drink => | |
System.out.println(drink) | |
() | |
} | |
} | |
val b1 = Barista("Jack") | |
val b2 = Barista("Peter") | |
val b3 = Barista("Jane") | |
val b4 = Barista("Susan") | |
val hq = hotDrinkQueue.unsafeRun() | |
val cq = coldDrinkQueue.unsafeRun() | |
val j = joinDrinkQueue.unsafeRun() | |
val h1 = hotBaristaWorker(hq, b1) | |
val h2 = hotBaristaWorker(hq, b2) | |
val h3 = hotBaristaWorker(hq, b3) | |
val c1 = coldBaristaWorker(cq, b4) | |
val i: Stream[Task, Unit] = items(hq, cq) | |
val j1: Stream[Task, Unit] = joinBaristaWorker(h1, j) | |
val j2: Stream[Task, Unit] = joinBaristaWorker(h2, j) | |
val j3: Stream[Task, Unit] = joinBaristaWorker(h3, j) | |
val j4: Stream[Task, Unit] = joinBaristaWorker(c1, j) | |
val w: Stream[Task, Unit] = waiter(j) | |
i.merge(j1).merge(j2).merge(j3).merge(j4).merge(w).drain.run.unsafeRun() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These are fs2 versions of scalaz examples from this book:
https://aappddeevv.gitbooks.io/test_private_book/content/examples/cafe.html