Last active
June 14, 2021 18:36
-
-
Save rrodseth/ed3e5edf90ce36e3cf6b to your computer and use it in GitHub Desktop.
Create akka-stream Source from a pagination, using Source.unfoldAsync
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Inspired by a tweet from @trautonen 1/13/2016 | |
// Use Source.unfoldAsync to turn paginated database results into an akka-streams Source | |
// unfold is the inverse of fold | |
case class Page[T](pageNumber:Long, totalPages:Long, contents:List[T]) | |
case class Thing(id: Long, name: String = "foo") | |
val totalPages = 5 // | |
val pageSize = 3 | |
// Imagine instead a Slick 3 Database call. The total pages would come from the base query, rather than being supplied | |
def fetchPage(pageNumber: Long, pageSize: Long, totalPages: Long): Future[Page[Thing]] = Future { | |
val start = pageNumber * pageSize | |
Page(pageNumber, totalPages, List(Thing(start), Thing(start + 1), Thing(start + 2))) | |
} | |
val sink: Sink[Any, Future[Unit]] = Sink.foreach(println) | |
// Create a Source using Source.unfoldAsync | |
val startPage = 1 | |
val source = Source.unfoldAsync(startPage) { pageNum => | |
val futurePage: Future[Page[Thing]] = fetchPage(pageNum, pageSize, totalPages) | |
val next = futurePage.map(page => if (page.pageNumber > page.totalPages) None else Some((pageNum + 1, page))) | |
next | |
} | |
val result: Future[Unit] = source.runWith(sink) | |
result.onComplete { _ => system.shutdown() } | |
/* | |
Page(1,5,List(Thing(3,foo), Thing(4,foo), Thing(5,foo))) | |
Page(2,5,List(Thing(6,foo), Thing(7,foo), Thing(8,foo))) | |
Page(3,5,List(Thing(9,foo), Thing(10,foo), Thing(11,foo))) | |
Page(4,5,List(Thing(12,foo), Thing(13,foo), Thing(14,foo))) | |
Page(5,5,List(Thing(15,foo), Thing(16,foo), Thing(17,foo))) | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey @rrodseth the example worked perfectly for me. When I tried to implement it with a function that fetches some rows from a database It only seems to run twice and is only able to publish one of the messages. I feel like I am missing a crucial detail like a callback to make the .unfoldAsync iterate or something of that sort - any ideas off the top of your head?