Created
April 28, 2022 23:14
-
-
Save robzienert/1b9bb5f6a6f7eb74edd4516801c83c36 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.temporal.config | |
import com.fasterxml.jackson.databind.ObjectMapper | |
import com.netflix.grpc.metatron.NettyMetatronSslContext | |
import com.netflix.temporal.config.customizers.WorkerFactoryOptionsCustomizer | |
import com.netflix.temporal.config.customizers.WorkflowClientOptionsCustomizer | |
import com.netflix.temporal.config.customizers.WorkflowServiceStubsOptionsCustomizer | |
import com.netflix.temporal.core.WorkerFactoryVisitor | |
import com.netflix.temporal.spring.ClusterAwareWorkflowClient | |
import com.netflix.temporal.spring.ClusterAwareWorkflowServiceStubs | |
import com.uber.m3.tally.RootScopeBuilder | |
import com.uber.m3.tally.Scope | |
import com.uber.m3.tally.StatsReporter | |
import com.uber.m3.util.Duration | |
import io.micrometer.core.instrument.MeterRegistry | |
import io.temporal.client.WorkflowClient | |
import io.temporal.client.WorkflowClientOptions | |
import io.temporal.client.WorkflowOptions | |
import io.temporal.common.context.ContextPropagator | |
import io.temporal.common.converter.ByteArrayPayloadConverter | |
import io.temporal.common.converter.DataConverter | |
import io.temporal.common.converter.DefaultDataConverter | |
import io.temporal.common.converter.JacksonJsonPayloadConverter | |
import io.temporal.common.converter.NullPayloadConverter | |
import io.temporal.common.converter.ProtobufJsonPayloadConverter | |
import io.temporal.common.interceptors.WorkerInterceptor | |
import io.temporal.common.reporter.MicrometerClientStatsReporter | |
import io.temporal.serviceclient.WorkflowServiceStubs | |
import io.temporal.serviceclient.WorkflowServiceStubsOptions | |
import io.temporal.worker.WorkerFactory | |
import org.springframework.beans.factory.BeanCreationException | |
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean | |
import org.springframework.boot.context.properties.EnableConfigurationProperties | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.ComponentScan | |
import org.springframework.context.annotation.Conditional | |
import org.springframework.context.annotation.Configuration | |
import org.springframework.scheduling.annotation.EnableScheduling | |
/** | |
* Spring auto-configuration for Temporal. | |
*/ | |
@Configuration(proxyBeanMethods = false) | |
@Conditional(OnTemporalEnabled::class) | |
@EnableConfigurationProperties(TemporalConfigProperties::class) | |
@ComponentScan( | |
"com.netflix.temporal.config.customizers", | |
"com.netflix.temporal.spring", | |
) | |
@EnableScheduling | |
@Suppress("UndocumentedPublicFunction") | |
public open class TemporalAutoConfiguration { | |
@Bean | |
@ConditionalOnMissingBean | |
public open fun dataConverter(objectMapper: ObjectMapper): DataConverter = | |
DefaultDataConverter( | |
NullPayloadConverter(), | |
ByteArrayPayloadConverter(), | |
ProtobufJsonPayloadConverter(), | |
JacksonJsonPayloadConverter(objectMapper) | |
) | |
@Bean | |
@ConditionalOnMissingBean | |
public open fun defaultWorkflowOptions(contextPropagators: List<ContextPropagator>): WorkflowOptions = | |
WorkflowOptions.newBuilder() | |
.setContextPropagators(contextPropagators) | |
.build() | |
@Bean | |
@ConditionalOnMissingBean | |
public open fun tallyScope(meterRegistry: MeterRegistry): Scope { | |
val reporter: StatsReporter = | |
MicrometerClientStatsReporter(meterRegistry) | |
return RootScopeBuilder() | |
.reporter(reporter) | |
.reportEvery(Duration.ofSeconds(10.0)) | |
} | |
/** | |
* Create a [WorkflowServiceStubs] for each defined cluster. | |
* | |
* These objects should not be used directly, but instead used via | |
* [com.netflix.temporal.spring.WorkflowServiceStubsProvider]. | |
*/ | |
@Bean | |
public open fun workflowServiceStubs( | |
configProperties: TemporalConfigProperties, | |
tallyScope: Scope, | |
workflowServiceStubsOptionsCustomizers: List<WorkflowServiceStubsOptionsCustomizer> | |
): Collection<WorkflowServiceStubs> { | |
val sslContext = NettyMetatronSslContext.createClientFactory().create("temporal").orElseThrow { | |
BeanCreationException( | |
"Failed creating TemporalNettyMetatronSslContext for WorkflowServiceStubs" | |
) | |
} | |
return configProperties.enabledClusters().map { cluster -> | |
val builder = WorkflowServiceStubsOptions.newBuilder() | |
.setSslContext(sslContext) | |
.setMetricsScope(tallyScope) | |
.setTarget(cluster.value.workerTarget) | |
workflowServiceStubsOptionsCustomizers.forEach { c -> c.invoke(builder) } | |
ClusterAwareWorkflowServiceStubs( | |
cluster.key, | |
WorkflowServiceStubs.newInstance(builder.build()) | |
) | |
} | |
} | |
/** | |
* Create clients, both defined explicitly via the clients config, and implicitly via worker config. | |
* Client configuration is optional, so if a worker has been defined with a namespace that doesn't have an | |
* explicitly defined client, a client will be created with default options. | |
* | |
* These objects should not be used directly, but instead used via | |
* [com.netflix.temporal.spring.WorkflowClientProvider]. | |
*/ | |
@Bean | |
public open fun workflowClients( | |
configProperties: TemporalConfigProperties, | |
workflowServiceStubs: Collection<WorkflowServiceStubs>, | |
workflowClientOptionsCustomizers: List<WorkflowClientOptionsCustomizer> | |
): Collection<WorkflowClient> { | |
val namespacesWithClients = mutableSetOf<String>() | |
// Create explicitly defined clients. | |
val clients = configProperties.clients | |
.flatMap { clientConfig -> | |
configProperties.enabledClusters().keys.map { clusterName -> | |
val workflowService = workflowServiceStubs | |
.filterIsInstance<ClusterAwareWorkflowServiceStubs>() | |
.first { it.clusterName == clusterName } | |
if (clientConfig.value.options.namespace == null) { | |
clientConfig.value.options.namespace = configProperties.defaultNamespace | |
} | |
val namespace = clientConfig.value.options.namespace!! | |
namespacesWithClients.add(namespace) | |
workflowClient( | |
namespace, | |
clusterName, | |
clientConfig.value.options, | |
workflowClientOptionsCustomizers, | |
workflowService | |
) | |
} | |
} | |
.toMutableList() | |
// Create implicitly-defined WorkflowClients based on workers whose clients have not yet been created. | |
configProperties.workers.values | |
.mapNotNull { it.namespace } | |
.toMutableList() | |
.also { it.add(configProperties.defaultNamespace) } | |
.distinct() | |
.filterNot { namespacesWithClients.contains(it) } | |
.forEach { namespace -> | |
configProperties.enabledClusters().keys.forEach { clusterName -> | |
val workflowService = workflowServiceStubs | |
.filterIsInstance<ClusterAwareWorkflowServiceStubs>() | |
.first { it.clusterName == clusterName } | |
val config = TemporalConfigProperties.ClientOptionsProperties().also { | |
it.namespace = namespace | |
} | |
clients.add(workflowClient(namespace, clusterName, config, workflowClientOptionsCustomizers, workflowService)) | |
} | |
} | |
return clients | |
} | |
private fun workflowClient( | |
namespace: String, | |
clusterName: String, | |
clientOptions: TemporalConfigProperties.ClientOptionsProperties, | |
workflowClientOptionsCustomizers: List<WorkflowClientOptionsCustomizer>, | |
workflowService: WorkflowServiceStubs | |
): WorkflowClient { | |
val builder = | |
WorkflowClientOptions.newBuilder() | |
.setNamespace(namespace) | |
.setQueryRejectCondition(clientOptions.queryRejectCondition) | |
workflowClientOptionsCustomizers.forEach { it.invoke(builder) } | |
return ClusterAwareWorkflowClient( | |
clusterName, | |
WorkflowClient.newInstance( | |
workflowService, | |
builder.build() | |
) | |
) | |
} | |
/** | |
* Create all explicitly configured [WorkerFactory]s. No WorkerFactories are created implicitly. | |
* | |
* These objects should not be used directly, but instead used via | |
* [com.netflix.temporal.spring.WorkerFactoryProvider]. | |
*/ | |
@Bean | |
public open fun workerFactories( | |
configProperties: TemporalConfigProperties, | |
workflowClients: Collection<WorkflowClient>, | |
workerFactoryOptionsCustomizers: List<WorkerFactoryOptionsCustomizer>, | |
workerInterceptors: Collection<WorkerInterceptor>, | |
workerFactoryVisitors: Collection<WorkerFactoryVisitor> | |
): Map<String, WorkerFactory> { | |
return configProperties.workers | |
.filter { it.value.isEnabled } | |
.flatMap { | |
configProperties.enabledClusters().keys.map { clusterName -> | |
val namespace = it.value.namespace ?: configProperties.defaultNamespace | |
val workflowClient = workflowClients | |
.filterIsInstance<ClusterAwareWorkflowClient>() | |
.first { it.clusterName == clusterName && it.options.namespace == namespace } | |
val builder = it.value.factoryOptions.toOptions().toBuilder() | |
.setWorkerInterceptors(*workerInterceptors.toTypedArray()) | |
// First, apply the global customizers | |
workerFactoryOptionsCustomizers.forEach { c -> c.invoke(builder) } | |
// Then let the userland factory visitor customize the builder if necessary | |
workerFactoryVisitors.find { v -> v.name == it.key } | |
?.customizeWorkerFactoryOptions(builder) | |
"${it.key}-$clusterName" to WorkerFactory.newInstance(workflowClient, builder.build()) | |
} | |
} | |
.toMap() | |
} | |
private fun TemporalConfigProperties.enabledClusters(): Map<String, TemporalConfigProperties.TemporalClusterProperties> = | |
this.clusters.filter { it.value.isEnabled } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2021 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.netflix.temporal.config; | |
import com.netflix.temporal.core.WorkerFactoryVisitor; | |
import io.temporal.api.enums.v1.QueryRejectCondition; | |
import io.temporal.worker.WorkerFactoryOptions; | |
import io.temporal.worker.WorkerOptions; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
import javax.annotation.Nonnull; | |
import javax.annotation.Nullable; | |
import java.time.Duration; | |
import java.util.HashMap; | |
import java.util.Map; | |
/** | |
* Configuration properties for Temporal applications. | |
* | |
* Note: Written in Java, since applications that do not have Kotlin stdlib explicitly in their project will fail | |
* to bind their configuration into the class. | |
*/ | |
@ConfigurationProperties("temporal") | |
public class TemporalConfigProperties { | |
/** | |
* Cluster configuration options, keyed by cluster name. | |
*/ | |
@Nonnull | |
private Map<String, TemporalClusterProperties> clusters = new HashMap<>(); | |
/** | |
* Worker configuration options, keyed by worker name. Worker name must match the value of | |
* {@link WorkerFactoryVisitor#getName()}. | |
*/ | |
@Nonnull | |
private Map<String, TemporalWorkerProperties> workers = new HashMap<>(); | |
/** | |
* Client configuration options, keyed by client name. | |
*/ | |
@Nonnull | |
private Map<String, TemporalClientProperties> clients = new HashMap<>(); | |
/** | |
* The default namespace to use for the application. Most applications will only need to set this, however | |
* overrides can be set on a per-worker basis via {@link TemporalWorkerProperties}. | |
*/ | |
@Nonnull | |
private String defaultNamespace = "default"; | |
@Nonnull | |
public Map<String, TemporalClusterProperties> getClusters() { | |
return clusters; | |
} | |
public void setClusters(@Nonnull Map<String, TemporalClusterProperties> clusters) { | |
this.clusters = clusters; | |
} | |
@Nonnull | |
public Map<String, TemporalWorkerProperties> getWorkers() { | |
return workers; | |
} | |
public void setWorkers(@Nonnull Map<String, TemporalWorkerProperties> workers) { | |
this.workers = workers; | |
} | |
@Nonnull | |
public Map<String, TemporalClientProperties> getClients() { | |
return clients; | |
} | |
public void setClients(@Nonnull Map<String, TemporalClientProperties> clients) { | |
this.clients = clients; | |
} | |
@Nonnull | |
public String getDefaultNamespace() { | |
return defaultNamespace; | |
} | |
public void setDefaultNamespace(@Nonnull String defaultNamespace) { | |
this.defaultNamespace = defaultNamespace; | |
} | |
public static class TemporalClusterProperties { | |
/** | |
* Whether or not the cluster connection is enabled. | |
*/ | |
private boolean enabled = true; | |
/** | |
* The URL and port of the Temporal HTTP UI. | |
*/ | |
@Nonnull | |
private String uiTarget; | |
/** | |
* The URI and port of the Temporal gRPC frontend. | |
*/ | |
@Nonnull | |
private String workerTarget; | |
public boolean isEnabled() { | |
return enabled; | |
} | |
public void setEnabled(boolean enabled) { | |
this.enabled = enabled; | |
} | |
@Nonnull | |
public String getUiTarget() { | |
return uiTarget; | |
} | |
public void setUiTarget(@Nonnull String uiTarget) { | |
this.uiTarget = uiTarget; | |
} | |
@Nonnull | |
public String getWorkerTarget() { | |
return workerTarget; | |
} | |
public void setWorkerTarget(@Nonnull String workerTarget) { | |
this.workerTarget = workerTarget; | |
} | |
} | |
public static class TemporalWorkerProperties { | |
public static final Duration DEFAULT_AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(30); | |
/** | |
* Whether or not the worker is enabled. | |
*/ | |
private boolean enabled; | |
/** | |
* The namespace the worker will be associated with. If unset, the namespace value from | |
* {@link TemporalConfigProperties#defaultNamespace} will be used. | |
*/ | |
@Nullable | |
private String namespace; | |
/** | |
* Worker options. | |
*/ | |
@Nonnull | |
private WorkerOptionsProperties options = new WorkerOptionsProperties(); | |
@Nonnull | |
private WorkerFactoryOptionsProperties factoryOptions = new WorkerFactoryOptionsProperties(); | |
/** | |
* How long workers will block shutdown to flush already accepted tasks. This value should be configured | |
* at least as long as your worker's longest duration activity or workflow task timeout to ensure a clean shutdown. | |
*/ | |
@Nonnull | |
private Duration awaitTerminationTimeout = DEFAULT_AWAIT_TERMINATION_TIMEOUT; | |
public boolean isEnabled() { | |
return enabled; | |
} | |
public void setEnabled(boolean enabled) { | |
this.enabled = enabled; | |
} | |
@Nullable | |
public String getNamespace() { | |
return namespace; | |
} | |
public void setNamespace(@Nullable String namespace) { | |
this.namespace = namespace; | |
} | |
@Nonnull | |
public WorkerOptionsProperties getOptions() { | |
return options; | |
} | |
public void setOptions(@Nonnull WorkerOptionsProperties options) { | |
this.options = options; | |
} | |
@Nonnull | |
public WorkerFactoryOptionsProperties getFactoryOptions() { | |
return factoryOptions; | |
} | |
public void setFactoryOptions(@Nonnull WorkerFactoryOptionsProperties factoryOptions) { | |
this.factoryOptions = factoryOptions; | |
} | |
@Nonnull | |
public Duration getAwaitTerminationTimeout() { | |
return awaitTerminationTimeout; | |
} | |
public void setAwaitTerminationTimeout(@Nonnull Duration awaitTerminationTimeout) { | |
this.awaitTerminationTimeout = awaitTerminationTimeout; | |
} | |
} | |
public static class WorkerOptionsProperties { | |
private static final WorkerOptions defaultWorkerOptions = WorkerOptions.getDefaultInstance(); | |
/** | |
* The maximum number of activities that can be executed per-second. | |
*/ | |
private double maxWorkerActivitiesPerSecond = defaultWorkerOptions.getMaxWorkerActivitiesPerSecond(); | |
/** | |
* The maximum number of concurrent activities that can be executed. | |
*/ | |
private int maxConcurrentActivityExecutionSize = defaultWorkerOptions.getMaxConcurrentActivityExecutionSize(); | |
/** | |
* The maximum number of concurrent workflow tasks that can be executed. | |
*/ | |
private int maxConcurrentWorkflowTaskExecutionSize = defaultWorkerOptions.getMaxConcurrentWorkflowTaskExecutionSize(); | |
/** | |
* The maximum number of concurrent local activity tasks that can be executed. | |
*/ | |
private int maxConcurrentLocalActivityExecutionSize = defaultWorkerOptions.getMaxConcurrentLocalActivityExecutionSize(); | |
/** | |
* The maximum number of task queue activities to process per-second. | |
*/ | |
private double maxTaskQueueActivitiesPerSecond = defaultWorkerOptions.getMaxTaskQueueActivitiesPerSecond(); | |
/** | |
* The number of threads to use to poll for workflow tasks. | |
*/ | |
private int workflowPollThreadCount = defaultWorkerOptions.getWorkflowPollThreadCount(); | |
/** | |
* The number of threads to use to poll for activity tasks. | |
*/ | |
private int activityPollThreadCount = defaultWorkerOptions.getActivityPollThreadCount(); | |
/** | |
* Whether or not the worker should only be processing local activities. | |
*/ | |
private boolean localActivityWorkerOnly = defaultWorkerOptions.isLocalActivityWorkerOnly(); | |
public double getMaxWorkerActivitiesPerSecond() { | |
return maxWorkerActivitiesPerSecond; | |
} | |
public void setMaxWorkerActivitiesPerSecond(double maxWorkerActivitiesPerSecond) { | |
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond; | |
} | |
public int getMaxConcurrentActivityExecutionSize() { | |
return maxConcurrentActivityExecutionSize; | |
} | |
public void setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityExecutionSize) { | |
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; | |
} | |
public int getMaxConcurrentWorkflowTaskExecutionSize() { | |
return maxConcurrentWorkflowTaskExecutionSize; | |
} | |
public void setMaxConcurrentWorkflowTaskExecutionSize(int maxConcurrentWorkflowTaskExecutionSize) { | |
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize; | |
} | |
public int getMaxConcurrentLocalActivityExecutionSize() { | |
return maxConcurrentLocalActivityExecutionSize; | |
} | |
public void setMaxConcurrentLocalActivityExecutionSize(int maxConcurrentLocalActivityExecutionSize) { | |
this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; | |
} | |
public double getMaxTaskQueueActivitiesPerSecond() { | |
return maxTaskQueueActivitiesPerSecond; | |
} | |
public void setMaxTaskQueueActivitiesPerSecond(double maxTaskQueueActivitiesPerSecond) { | |
this.maxTaskQueueActivitiesPerSecond = maxTaskQueueActivitiesPerSecond; | |
} | |
public int getWorkflowPollThreadCount() { | |
return workflowPollThreadCount; | |
} | |
public void setWorkflowPollThreadCount(int workflowPollThreadCount) { | |
this.workflowPollThreadCount = workflowPollThreadCount; | |
} | |
public int getActivityPollThreadCount() { | |
return activityPollThreadCount; | |
} | |
public void setActivityPollThreadCount(int activityPollThreadCount) { | |
this.activityPollThreadCount = activityPollThreadCount; | |
} | |
public boolean isLocalActivityWorkerOnly() { | |
return localActivityWorkerOnly; | |
} | |
public void setLocalActivityWorkerOnly(boolean localActivityWorkerOnly) { | |
this.localActivityWorkerOnly = localActivityWorkerOnly; | |
} | |
/** | |
* Converts the properties into {@link WorkerOptions}. | |
* @return WorkerOptions | |
*/ | |
public WorkerOptions toOptions() { | |
WorkerOptions.Builder builder = WorkerOptions.newBuilder() | |
.setMaxTaskQueueActivitiesPerSecond(maxTaskQueueActivitiesPerSecond) | |
.setWorkflowPollThreadCount(workflowPollThreadCount) | |
.setActivityPollThreadCount(activityPollThreadCount) | |
.setLocalActivityWorkerOnly(localActivityWorkerOnly); | |
/** | |
* Zero is a valid value for these configurations, however the below conditionals are in-place | |
* due to a faulty validation in the builder setters that verifies a value is not less than | |
* or equal to zero. | |
*/ | |
if (maxWorkerActivitiesPerSecond > 0) { | |
builder.setMaxWorkerActivitiesPerSecond(maxWorkerActivitiesPerSecond); | |
} | |
if (maxConcurrentActivityExecutionSize > 0) { | |
builder.setMaxConcurrentActivityExecutionSize(maxConcurrentActivityExecutionSize); | |
} | |
if (maxConcurrentWorkflowTaskExecutionSize > 0) { | |
builder.setMaxConcurrentWorkflowTaskExecutionSize(maxConcurrentWorkflowTaskExecutionSize); | |
} | |
if (maxConcurrentLocalActivityExecutionSize > 0) { | |
builder.setMaxConcurrentLocalActivityExecutionSize(maxConcurrentLocalActivityExecutionSize); | |
} | |
return builder.build(); | |
} | |
} | |
public static class WorkerFactoryOptionsProperties { | |
private static final WorkerFactoryOptions defaultWorkerFactoryOptions = WorkerFactoryOptions.getDefaultInstance(); | |
/** | |
* Timeout for a workflow task routed to the the host that caches a workflow object. Once it times out then it | |
* can be picked up by any worker. | |
*/ | |
private Duration workflowHostLocalTaskQueueScheduleToStartTimeout = defaultWorkerFactoryOptions.getWorkflowHostLocalTaskQueueScheduleToStartTimeout(); | |
/** | |
* To avoid constant replay of code the workflow objects are cached on a worker. This cache is shared by all | |
* workers created by the Factory. Note that in the majority of situations the number of cached workflows is | |
* limited not by this value, but by the number of the threads defined through {@link #maxWorkflowThreadCount}. | |
*/ | |
private int workflowCacheSize = defaultWorkerFactoryOptions.getWorkflowCacheSize(); | |
/** | |
* Maximum number of threads available for workflow execution across all workers created by the Factory. | |
* This includes cached workflows. | |
*/ | |
private int maxWorkflowThreadCount = defaultWorkerFactoryOptions.getMaxWorkflowThreadCount(); | |
/** | |
* TODO(rz): Docs. Seems to be an undocumented config in the SDK. | |
*/ | |
private int workflowHostLocalPollThreadCount = defaultWorkerFactoryOptions.getWorkflowHostLocalPollThreadCount(); | |
public Duration getWorkflowHostLocalTaskQueueScheduleToStartTimeout() { | |
return workflowHostLocalTaskQueueScheduleToStartTimeout; | |
} | |
public void setWorkflowHostLocalTaskQueueScheduleToStartTimeout(Duration workflowHostLocalTaskQueueScheduleToStartTimeout) { | |
this.workflowHostLocalTaskQueueScheduleToStartTimeout = workflowHostLocalTaskQueueScheduleToStartTimeout; | |
} | |
public int getWorkflowCacheSize() { | |
return workflowCacheSize; | |
} | |
public void setWorkflowCacheSize(int workflowCacheSize) { | |
this.workflowCacheSize = workflowCacheSize; | |
} | |
public int getMaxWorkflowThreadCount() { | |
return maxWorkflowThreadCount; | |
} | |
public void setMaxWorkflowThreadCount(int maxWorkflowThreadCount) { | |
this.maxWorkflowThreadCount = maxWorkflowThreadCount; | |
} | |
public int getWorkflowHostLocalPollThreadCount() { | |
return workflowHostLocalPollThreadCount; | |
} | |
public void setWorkflowHostLocalPollThreadCount(int workflowHostLocalPollThreadCount) { | |
this.workflowHostLocalPollThreadCount = workflowHostLocalPollThreadCount; | |
} | |
public WorkerFactoryOptions toOptions() { | |
return WorkerFactoryOptions.newBuilder() | |
.setWorkflowHostLocalPollThreadCount(workflowHostLocalPollThreadCount) | |
.setWorkflowHostLocalTaskQueueScheduleToStartTimeout(workflowHostLocalTaskQueueScheduleToStartTimeout) | |
.setWorkflowCacheSize(workflowCacheSize) | |
.setMaxWorkflowThreadCount(maxWorkflowThreadCount) | |
.build(); | |
} | |
} | |
public static class TemporalClientProperties { | |
/** | |
* {@link io.temporal.client.WorkflowClient} options. | |
*/ | |
@Nonnull | |
private ClientOptionsProperties options = new ClientOptionsProperties(); | |
@Nonnull | |
public ClientOptionsProperties getOptions() { | |
return options; | |
} | |
public void setOptions(@Nonnull ClientOptionsProperties options) { | |
this.options = options; | |
} | |
} | |
public static class ClientOptionsProperties { | |
/** | |
* The namespace that the client is connecting to. If left unset, {@link TemporalConfigProperties#defaultNamespace} | |
* will be used. | |
*/ | |
@Nullable | |
private String namespace; | |
/** | |
* See {@link io.temporal.client.WorkflowClientOptions.Builder#setQueryRejectCondition}. | |
*/ | |
@Nonnull | |
private QueryRejectCondition queryRejectCondition = QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED; | |
@Nullable | |
public String getNamespace() { | |
return namespace; | |
} | |
public void setNamespace(@Nullable String namespace) { | |
this.namespace = namespace; | |
} | |
@Nonnull | |
public QueryRejectCondition getQueryRejectCondition() { | |
return queryRejectCondition; | |
} | |
public void setQueryRejectCondition(@Nonnull QueryRejectCondition queryRejectCondition) { | |
this.queryRejectCondition = queryRejectCondition; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2021 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.netflix.temporal.config.processors; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.env.EnvironmentPostProcessor; | |
import org.springframework.core.Ordered; | |
import org.springframework.core.env.ConfigurableEnvironment; | |
import org.springframework.core.io.support.ResourcePropertySource; | |
import java.io.IOException; | |
/** | |
* Adds default environment configuration for Temporal. | |
*/ | |
public class TemporalEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered { | |
private final static String CLASSPATH_PROPERTIES_RESOURCE = "classpath:/netflix-temporal-defaults.properties"; | |
private final static String CI_CLASSPATH_PROPERTIES_RESOURCE = "classpath:/netflix-temporal-ci-profile.yml"; | |
@Override | |
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { | |
try { | |
environment.getPropertySources() | |
.addLast(new ResourcePropertySource("temporal-defaults", CLASSPATH_PROPERTIES_RESOURCE)); | |
} catch (IOException e) { | |
throw new IllegalStateException("could not add property source '" + CLASSPATH_PROPERTIES_RESOURCE + "'", e); | |
} | |
// Add some CI config overrides that disable worker polling. | |
try { | |
environment.getPropertySources() | |
.addFirst(new ResourcePropertySource("temporal-ci-defaults", CI_CLASSPATH_PROPERTIES_RESOURCE)); | |
} catch (IOException e) { | |
throw new IllegalStateException("could not add property source '" + CI_CLASSPATH_PROPERTIES_RESOURCE + "'", e); | |
} | |
} | |
@Override | |
public int getOrder() { | |
return Ordered.LOWEST_PRECEDENCE; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment