Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Forked from huntc/server.scala
Created November 16, 2017 22:44
Show Gist options
  • Save calvinlfer/f8305e026a1b7a591ea54ddc16f9c480 to your computer and use it in GitHub Desktop.
Save calvinlfer/f8305e026a1b7a591ea54ddc16f9c480 to your computer and use it in GitHub Desktop.
A complete server using Akka streams that reads some source, batches its data and then publishes. If the data cannot be published then it backs off with a best-effort of sending that data again.
val (recycleQueue, recycleSource) =
Source
.queue[SoilStateReading](100, OverflowStrategy.dropTail)
.prefixAndTail(0)
.map(_._2)
.toMat(Sink.head)(Keep.both)
.run()
StreamConverters.fromInputStream(() => this.getClass.getClassLoader.getResourceAsStream("sensors.log"))
.via(SoilStateReading.csvParser)
.merge(Source.fromFutureSource(recycleSource))
.batch(100, e => List(e))((a, e) => e +: a)
.via(RestartFlow.withBackoff(1.second, 3.seconds, 0.2) { () =>
Flow[Seq[SoilStateReading]]
.mapAsync(1) { readings =>
responder
.post(readings)
.recover {
case e: IllegalStateException =>
readings.foreach(recycleQueue.offer)
throw e
}
}
})
.runWith(Sink.ignore)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment