Skip to content

Instantly share code, notes, and snippets.

View jchapuis's full-sized avatar
🤓

Jonas Chapuis jchapuis

🤓
View GitHub Profile
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
"notify stopped with error in case of service stop timeout" in {
implicit val scheduler = TestScheduler()
val serviceA = Service.withName("A").withStopper(() => Future.never).build
val serviceB = Service.withName("B").withStopper(() => Future.successful(())).build
Given("a started composite with finite stopped timeout")
implicit val stoppedTimeout = FiniteStoppedTimeout(30 seconds)
val composite = new CompositeService(serviceA, serviceB)
composite.start()
scheduler.tick(1 second)
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
case class HttpServiceDefinition(
route: Route,
host: String,
port: Int,
pendingRequestsHardDeadline: FiniteDuration,
unbindingDelay: FiniteDuration
)
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
Observable
.fromTask(startTriggered)
.ambWith(Observable.fromTask(stopTriggered))
.flatMap(_ =>
Observable
.fromIterable(services)
.takeUntil(Observable.fromTask(stopTriggered))
.mapEval(startService)
.map(_.asRight)
.onErrorHandleWith {
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
(for {
_ <- Task.fromFuture(stopTriggered)
results <- startResults
stopResults <- Observable
.fromIterable(results.startedServices.reverse)
.mapEval(stopService)
.foldLeftL(List.empty[Either[ServiceStopError, Service]])(
(stopResults, stopResult) => stopResults :+ stopResult
)
} yield {
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
import scala.concurrent.{ ExecutionContext, Future, Promise }
/**
* Stoppable[T]: represents a process with a
* termination function of type `T`
*/
trait Stoppable[T] {
protected implicit def executionContext: ExecutionContext
private val stoppedPromise = Promise[T]()
private val stopTriggeredPromise = Promise[Unit]()
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
implicit class FluentCoordinatedShutdown(coordinatedShutdown: CoordinatedShutdown) {
/**
* The first pre-defined phase that applications can add tasks to.
*/
def beforeServiceUnbind(task: () => Future[Done], name: String): CoordinatedShutdown =
withTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, task, name)
/**
* Stop accepting new incoming connections.
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
val akkaApplicationService: Service = builder
.startSystemDependent(SomeEtcdService())
.thenStartHttp(
HttpServiceDefinition(
new AkkaControllers().routes,
serverConfig.host,
serverConfig.port,
serverConfig.hardDeadline,
serverConfig.unbindingDelay
)
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
def stopService(service: Service): Task[Either[ServiceStopError, Service]] =
Task.deferFuture {
service.triggerStop()
service.stopped.map(_.map(_ => service))
}.timeout(duration).onErrorRecover {
case _: TimeoutException => ServiceStopTimeoutError(service, duration).asLeft
}
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
object CoordinatedShutdownHttpHelpers {
implicit class FluentHttpCoordinatedShutdown(coordinatedShutdown: CoordinatedShutdown)(
implicit val system: ActorSystem
) {
private implicit val executionContext: ExecutionContext = system.dispatcher
def addHttpBinding(
binding: Http.ServerBinding,
pendingRequestsHardDeadline: FiniteDuration,
unbindingDelay: FiniteDuration
@jchapuis
jchapuis / Markdium-Scala.scala
Created May 10, 2020 20:06
Markdium-Orchestrating startup and shutdown in Scala
class ClusterUpAndServiceBoundCheck(system: ActorSystem) extends (() => Future[Boolean]) {
private val cluster = Cluster(system)
discard {
CoordinatedShutdown(system).serviceUnbind(
() =>
Future.successful {
serviceUnbound.set(true)
Done
},
"service-unbound-check"