Last active
November 11, 2019 19:33
-
-
Save matfournier/7e76330ae7970447cc249b46c2dca23e to your computer and use it in GitHub Desktop.
Monix Observable Help
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import monix.eval.Task | |
import monix.execution.Scheduler.Implicits.global | |
import monix.reactive._ | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
// possibly useful | |
// https://github.com/monix/monix/issues/481 | |
// https://stackoverflow.com/questions/45894205/getting-cassandra-query-results-asynchronously-using-scala-monix | |
// though one of the linked issues says flatMap is not stacksafe, need to test this... if it's not that's a problem! | |
class Go(pf: ParentFetch, cf: ChildFetch) { | |
def go: Task[List[Page]] = { | |
val observable = Observable | |
.fromAsyncStateAction[Task[PageResult[Page]], PageResult[Page]] { nextPage => | |
nextPage.flatMap(onNextPage) | |
}(pf.get(1)) | |
.takeWhileInclusive { page => | |
page.paging match { | |
case Next(_) => true | |
case _ => false | |
} | |
} | |
.flatMap { pr => | |
{ | |
// this seems expensive, if I have n pages I'm creating | |
// n observables. Surely this is wrong? | |
// | |
// design 2, if I wanted to say, batch fetch the children (if the api allowed) | |
// how would I buffer the parent stream into chunks of 10 to make a call to get | |
// 10 children through some batch api? I would still suffer from the same | |
// problem of many, many observables. | |
// | |
// in this version, none of the children have additioanl pages so I just use | |
// from task rather than fromAsyncTask like the parent, in reality it's another | |
// sequence of 1 or more calls. | |
Observable | |
.fromIterable(pr.data) | |
.flatMap( | |
p => | |
p match { | |
case pp: ParentPage => pp.child.fold(Observable.eval(p))(child => | |
Observable.fromTask(cf.get(child)) ++ Observable.eval(p)) | |
case cp: ChildPage => Observable.eval(cp) | |
} | |
) | |
} | |
} | |
val consumer: Consumer[Page, List[Page]] = | |
Consumer.foldLeft(List[Page]())((acc, page) => { | |
page +: acc | |
}) | |
observable.take(300).consumeWith(consumer) | |
} | |
def onNextPage(pageResult: PageResult[Page]): Task[(PageResult[Page], Task[PageResult[Page]])] = | |
pageResult.paging match { | |
case Next(i) => Task((pageResult, pf.get(i))) | |
case Finished => Task((pageResult, Task.now(pageResult))) | |
} | |
} | |
object GoApp extends App { | |
val pf = new ParentFetch() | |
val cf = new ChildFetch() | |
val go = new Go(pf, cf) | |
val result = go.go.runToFuture | |
val r = Await.result(result, Duration.Inf) | |
println("\n **** \n") | |
println(r) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait Page | |
final case class ParentPage(id: String, child: Option[String]) extends Page | |
final case class ChildPage(id: String, parent: String) extends Page | |
final case class PageResult[T](data: List[T], paging: Cursor) | |
sealed trait Cursor | |
final case class Next(i: Int) extends Cursor | |
case object Finished extends Cursor |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import monix.eval.Task | |
class ParentFetch { | |
type Result = PageResult[Page] | |
val p1 = ParentPage("p1", None) | |
val p2 = ParentPage("p2", None) | |
val p3 = ParentPage("p3", Some("c1")) | |
val p4 = ParentPage("p4", Some("c2")) | |
val p5 = ParentPage("p5", Some("c3")) | |
val p6 = ParentPage("p6", None) | |
// two sets of pages | |
// in reality there can be thousands | |
val parentMap: Map[Int, PageResult[Page]] = Map( | |
1 -> PageResult(List(p1, p2, p3, p4), Next(2)), | |
2 -> PageResult(List(p5, p6), Finished) | |
) | |
def get(i: Int): Task[Result] = | |
parentMap.get(i).fold(Task.raiseError[Result](new Exception("no more parents")))(v => Task.now(v)) | |
} | |
class ChildFetch { | |
val c1 = ChildPage("c1", "p3") | |
val c2 = ChildPage("c2", "p4") | |
val c3 = ChildPage("c3", "p5") | |
// in reality the children are also paged, simplified it here | |
// but a parent can have thousands of children in prod | |
val childMap: Map[String, ChildPage] = Map( | |
"c1" -> c1, | |
"c2" -> c2, | |
"c3" -> c3 | |
) | |
def get(s: String): Task[Page] = | |
childMap.get(s).fold(Task.raiseError[Page](new Exception("boop")))(c => Task.now(c)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment