This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| "notify stopped with error in case of service stop timeout" in { | |
| implicit val scheduler = TestScheduler() | |
| val serviceA = Service.withName("A").withStopper(() => Future.never).build | |
| val serviceB = Service.withName("B").withStopper(() => Future.successful(())).build | |
| Given("a started composite with finite stopped timeout") | |
| implicit val stoppedTimeout = FiniteStoppedTimeout(30 seconds) | |
| val composite = new CompositeService(serviceA, serviceB) | |
| composite.start() | |
| scheduler.tick(1 second) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("safe") | |
| .withStarter{ ()=> Future.successful(()) } | |
| .withStopper{ ()=> Future.successful(()) } | |
| .build |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| case class HttpServiceDefinition( | |
| route: Route, | |
| host: String, | |
| port: Int, | |
| pendingRequestsHardDeadline: FiniteDuration, | |
| unbindingDelay: FiniteDuration | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val akkaApplicationService: Service = builder | |
| .startSystemDependent(SomeEtcdService()) | |
| .thenStartHttp( | |
| HttpServiceDefinition( | |
| new AkkaControllers().routes, | |
| serverConfig.host, | |
| serverConfig.port, | |
| serverConfig.hardDeadline, | |
| serverConfig.unbindingDelay | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (for { | |
| _ <- Task.fromFuture(stopTriggered) | |
| results <- startResults | |
| stopResults <- Observable | |
| .fromIterable(results.startedServices.reverse) | |
| .mapEval(stopService) | |
| .foldLeftL(List.empty[Either[ServiceStopError, Service]])( | |
| (stopResults, stopResult) => stopResults :+ stopResult | |
| ) | |
| } yield { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class ClusterUpAndServiceBoundCheck(system: ActorSystem) extends (() => Future[Boolean]) { | |
| private val cluster = Cluster(system) | |
| discard { | |
| CoordinatedShutdown(system).serviceUnbind( | |
| () => | |
| Future.successful { | |
| serviceUnbound.set(true) | |
| Done | |
| }, | |
| "service-unbound-check" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Observable | |
| .fromTask(startTriggered) | |
| .ambWith(Observable.fromTask(stopTriggered)) | |
| .flatMap(_ => | |
| Observable | |
| .fromIterable(services) | |
| .takeUntil(Observable.fromTask(stopTriggered)) | |
| .mapEval(startService) | |
| .map(_.asRight) | |
| .onErrorHandleWith { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("topic-dependent") | |
| .withFallibleStarter( () => | |
| (for { | |
| _ <- EitherT(KafkaAdmin().ensureTopicCreated("foobar-topic")) | |
| _ <- EitherT.right[KafkaAdmin.TopicCreationFailed](consumer("foobar-topic")) | |
| } yield ()).value | |
| ) | |
| .build |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("in-memory-cache") | |
| .withFallibleStarter{ () => cache.loadAsync() : Future[Either[StartError, Unit]] } | |
| .build |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Service | |
| .withName("safe") | |
| .withStarter{ ()=> Future.successful(()) } | |
| .withStopper{ ()=> Future.successful(()) } | |
| .build |