Skip to content

Instantly share code, notes, and snippets.

@rrodseth
Last active June 14, 2021 18:36
Show Gist options
  • Save rrodseth/ed3e5edf90ce36e3cf6b to your computer and use it in GitHub Desktop.
Save rrodseth/ed3e5edf90ce36e3cf6b to your computer and use it in GitHub Desktop.
Create akka-stream Source from a pagination, using Source.unfoldAsync
// Inspired by a tweet from @trautonen 1/13/2016
// Use Source.unfoldAsync to turn paginated database results into an akka-streams Source
// unfold is the inverse of fold
case class Page[T](pageNumber:Long, totalPages:Long, contents:List[T])
case class Thing(id: Long, name: String = "foo")
val totalPages = 5 //
val pageSize = 3
// Imagine instead a Slick 3 Database call. The total pages would come from the base query, rather than being supplied
def fetchPage(pageNumber: Long, pageSize: Long, totalPages: Long): Future[Page[Thing]] = Future {
val start = pageNumber * pageSize
Page(pageNumber, totalPages, List(Thing(start), Thing(start + 1), Thing(start + 2)))
}
val sink: Sink[Any, Future[Unit]] = Sink.foreach(println)
// Create a Source using Source.unfoldAsync
val startPage = 1
val source = Source.unfoldAsync(startPage) { pageNum =>
val futurePage: Future[Page[Thing]] = fetchPage(pageNum, pageSize, totalPages)
val next = futurePage.map(page => if (page.pageNumber > page.totalPages) None else Some((pageNum + 1, page)))
next
}
val result: Future[Unit] = source.runWith(sink)
result.onComplete { _ => system.shutdown() }
/*
Page(1,5,List(Thing(3,foo), Thing(4,foo), Thing(5,foo)))
Page(2,5,List(Thing(6,foo), Thing(7,foo), Thing(8,foo)))
Page(3,5,List(Thing(9,foo), Thing(10,foo), Thing(11,foo)))
Page(4,5,List(Thing(12,foo), Thing(13,foo), Thing(14,foo)))
Page(5,5,List(Thing(15,foo), Thing(16,foo), Thing(17,foo)))
*/
Copy link

ghost commented Mar 3, 2017

Hey @rrodseth the example worked perfectly for me. When I tried to implement it with a function that fetches some rows from a database It only seems to run twice and is only able to publish one of the messages. I feel like I am missing a crucial detail like a callback to make the .unfoldAsync iterate or something of that sort - any ideas off the top of your head?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment