class ListingProcessor(transform: RawListing => FullListing,
save: String => Async[Unit]) {
def process(listings: Seq[RawListing]): Async[Unit] = {
val fullListings = listings.map(transform)
// ...logic, etc
save(fullListings.toJson)
}
}
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val f: Int => Future[Person] = ??? | |
val ids = List(1,2,3,4,5) | |
val persons: Future[List[Person]] = Future.traverse(ids, f) // parallel computation using Applicative | |
val persons2: List[Future[Person]] = ids map (id => f(id)) // sequential computation | |
persons2.sequence : Future[List[Person]] // now you can wait on one Future instead of many Futures |
val input: Seq[RawListing] = //... fixture
def transform(in: RawListing): FullListing = //... trivial
var savedResult: String = null
def save(json: String): Async[Unit] = Async {
insertResultHere = json
}
val expectedResult = //... actually generate some sort of JSON
val processor = new ListingProcessor(transform, save)
trait Sink[A] {
def apply(a: A): Async[Unit]
}
class ListingProcessor(transform: RawListing => FullListing,
save: Sink[Seq[FullListing]]) {
def process(listings: Seq[RawListing]): Async[Unit] = {
val fullListings = listings.map(transform)
// ...logic, etc
save(fullListings)
}
}
class JsonSink[-A](internalSink: Sink[String])(implicit encode: Encoder[A]) extends Sink[A] {
override def apply(a: A): Async[Unit] = {
internalSink(encode(a).noSpaces)
}
}
class HttpSink(http: HttpService) extends Sink[String] {
def apply(requestBody: String): Async[Unit] = {
http.put(requestBody).map(_ => ())
}
}
// Application wiring
val httpSink = new HttpSink(HttpService(s"${config.apiHost}/v1/listings"))
val jsonSink = new JsonSink[Seq[FullListing]](httpSink)
val listingTransformer = // ...
new ListingProcessor(listingTransformer, jsonSink)
class MemorySink[A] extends Sink[A]{
private var buffer: Vector[A] = Vector.empty
def replay: Vector[A] = buffer
def last: Option[A] = buffer.lastOption
override def apply(a: A): Async[Unit] = Async {
buffer :+= a
val outputListing: FullListing = //...
val inputListing: RawListing = //...
val save = new MemorySink[ListingResults]
val processor = new ListingProcessor(_ => Task.now(Some(outputListing)), save)
processor.processListing(Seq(inputListing))
// resolve asynchrony
save.last should beSome(Seq(outputListing))
OlderNewer