This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| trait Service extends Stoppable[Either[E, Unit]] { | |
| def start(): Future[E, Unit] | |
| def name: String | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("in-memory-cache") | |
| .withFallibleStarter{ () => cache.loadAsync() : Future[Either[StartError, Unit]] } | |
| .build |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("topic-dependent") | |
| .withFallibleStarter( () => | |
| (for { | |
| _ <- EitherT(KafkaAdmin().ensureTopicCreated("foobar-topic")) | |
| _ <- EitherT.right[KafkaAdmin.TopicCreationFailed](consumer("foobar-topic")) | |
| } yield ()).value | |
| ) | |
| .build |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class ClusterUpAndServiceBoundCheck(system: ActorSystem) extends (() => Future[Boolean]) { | |
| private val cluster = Cluster(system) | |
| discard { | |
| CoordinatedShutdown(system).serviceUnbind( | |
| () => | |
| Future.successful { | |
| serviceUnbound.set(true) | |
| Done | |
| }, | |
| "service-unbound-check" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| case class HttpServiceDefinition( | |
| route: Route, | |
| host: String, | |
| port: Int, | |
| pendingRequestsHardDeadline: FiniteDuration, | |
| unbindingDelay: FiniteDuration | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("safe") | |
| .withStarter{ ()=> Future.successful(()) } | |
| .withStopper{ ()=> Future.successful(()) } | |
| .build |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| implicit class FluentCoordinatedShutdown(coordinatedShutdown: CoordinatedShutdown) { | |
| /** | |
| * The first pre-defined phase that applications can add tasks to. | |
| */ | |
| def beforeServiceUnbind(task: () => Future[Done], name: String): CoordinatedShutdown = | |
| withTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, task, name) | |
| /** | |
| * Stop accepting new incoming connections. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Observable | |
| .fromTask(startTriggered) | |
| .ambWith(Observable.fromTask(stopTriggered)) | |
| .flatMap(_ => | |
| Observable | |
| .fromIterable(services) | |
| .takeUntil(Observable.fromTask(stopTriggered)) | |
| .mapEval(startService) | |
| .map(_.asRight) | |
| .onErrorHandleWith { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (for { | |
| _ <- Task.fromFuture(stopTriggered) | |
| results <- startResults | |
| stopResults <- Observable | |
| .fromIterable(results.startedServices.reverse) | |
| .mapEval(stopService) | |
| .foldLeftL(List.empty[Either[ServiceStopError, Service]])( | |
| (stopResults, stopResult) => stopResults :+ stopResult | |
| ) | |
| } yield { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scala.concurrent.{ ExecutionContext, Future, Promise } | |
| /** | |
| * Stoppable[T]: represents a process with a | |
| * termination function of type `T` | |
| */ | |
| trait Stoppable[T] { | |
| protected implicit def executionContext: ExecutionContext | |
| private val stoppedPromise = Promise[T]() | |
| private val stopTriggeredPromise = Promise[Unit]() |