trait Sink[A] {
def apply(a: A): Async[Unit]
final def contraMap[B](f: B => A): Sink[B] =
b => apply(f(b))
}
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alert("Blah"); |
val outputListing: FullListing = //...
val inputListing: RawListing = //...
val save = new MemorySink[ListingResults] // MemorySink[A] extends Sink[Id,A] now
val processor = new ListingProcessor[Id](_ => Some(outputListing), save)
processor.processListing(Seq(inputListing))
save.last should beSome(Seq(outputListing))
class ListingProcessor[F[_]: Monad](transform: RawListing => FullListing,
save: Sink[F, Seq[FullListing]]) {
def process(listings: Seq[RawListing]): F[Unit] = {
val fullListings = listings.map(transform)
// ...logic, etc
save(fullListings)
}
}
trait Sink[F[_], A] {
def apply(a: A): F[Unit]
}
val outputListing: FullListing = //...
val inputListing: RawListing = //...
val save = new MemorySink[ListingResults]
val processor = new ListingProcessor(_ => Task.now(Some(outputListing)), save)
processor.processListing(Seq(inputListing))
// resolve asynchrony
save.last should beSome(Seq(outputListing))
class MemorySink[A] extends Sink[A]{
private var buffer: Vector[A] = Vector.empty
def replay: Vector[A] = buffer
def last: Option[A] = buffer.lastOption
override def apply(a: A): Async[Unit] = Async {
buffer :+= a
// Application wiring
val httpSink = new HttpSink(HttpService(s"${config.apiHost}/v1/listings"))
val jsonSink = new JsonSink[Seq[FullListing]](httpSink)
val listingTransformer = // ...
new ListingProcessor(listingTransformer, jsonSink)
class HttpSink(http: HttpService) extends Sink[String] {
def apply(requestBody: String): Async[Unit] = {
http.put(requestBody).map(_ => ())
}
}
class JsonSink[-A](internalSink: Sink[String])(implicit encode: Encoder[A]) extends Sink[A] {
override def apply(a: A): Async[Unit] = {
internalSink(encode(a).noSpaces)
}
}
NewerOlder