Skip to content

Instantly share code, notes, and snippets.

View jchapuis's full-sized avatar
🤓

Jonas Chapuis jchapuis

🤓
View GitHub Profile
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
trait Service extends Stoppable[Either[E, Unit]] {
def start(): Future[E, Unit]
def name: String
}
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
Service
.withName("in-memory-cache")
.withFallibleStarter{ () => cache.loadAsync() : Future[Either[StartError, Unit]] }
.build
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
Service
.withName("topic-dependent")
.withFallibleStarter( () =>
(for {
_ <- EitherT(KafkaAdmin().ensureTopicCreated("foobar-topic"))
_ <- EitherT.right[KafkaAdmin.TopicCreationFailed](consumer("foobar-topic"))
} yield ()).value
)
.build
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
class ClusterUpAndServiceBoundCheck(system: ActorSystem) extends (() => Future[Boolean]) {
private val cluster = Cluster(system)
discard {
CoordinatedShutdown(system).serviceUnbind(
() =>
Future.successful {
serviceUnbound.set(true)
Done
},
"service-unbound-check"
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
case class HttpServiceDefinition(
route: Route,
host: String,
port: Int,
pendingRequestsHardDeadline: FiniteDuration,
unbindingDelay: FiniteDuration
)
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
Service
.withName("safe")
.withStarter{ ()=> Future.successful(()) }
.withStopper{ ()=> Future.successful(()) }
.build
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
implicit class FluentCoordinatedShutdown(coordinatedShutdown: CoordinatedShutdown) {
/**
* The first pre-defined phase that applications can add tasks to.
*/
def beforeServiceUnbind(task: () => Future[Done], name: String): CoordinatedShutdown =
withTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, task, name)
/**
* Stop accepting new incoming connections.
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
Observable
.fromTask(startTriggered)
.ambWith(Observable.fromTask(stopTriggered))
.flatMap(_ =>
Observable
.fromIterable(services)
.takeUntil(Observable.fromTask(stopTriggered))
.mapEval(startService)
.map(_.asRight)
.onErrorHandleWith {
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
(for {
_ <- Task.fromFuture(stopTriggered)
results <- startResults
stopResults <- Observable
.fromIterable(results.startedServices.reverse)
.mapEval(stopService)
.foldLeftL(List.empty[Either[ServiceStopError, Service]])(
(stopResults, stopResult) => stopResults :+ stopResult
)
} yield {
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 19:38
Markdium-Orchestrating startup and shutdown in Scala
import scala.concurrent.{ ExecutionContext, Future, Promise }
/**
* Stoppable[T]: represents a process with a
* termination function of type `T`
*/
trait Stoppable[T] {
protected implicit def executionContext: ExecutionContext
private val stoppedPromise = Promise[T]()
private val stopTriggeredPromise = Promise[Unit]()