Created
June 12, 2023 21:12
-
-
Save andresanches/dbd7a605d0d86ee19362f02c418d1c85 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.turo.security.authorization.spicedb; | |
import com.authzed.api.v1.Core.ObjectReference; | |
import com.authzed.api.v1.Core.Relationship; | |
import com.authzed.api.v1.Core.RelationshipUpdate; | |
import com.authzed.api.v1.Core.RelationshipUpdate.Operation; | |
import com.authzed.api.v1.Core.SubjectReference; | |
import com.authzed.api.v1.PermissionService.CheckPermissionRequest; | |
import com.authzed.api.v1.PermissionService.CheckPermissionResponse; | |
import com.authzed.api.v1.PermissionService.Consistency; | |
import com.authzed.api.v1.PermissionService.DeleteRelationshipsRequest; | |
import com.authzed.api.v1.PermissionService.DeleteRelationshipsResponse; | |
import com.authzed.api.v1.PermissionService.ExpandPermissionTreeRequest; | |
import com.authzed.api.v1.PermissionService.ExpandPermissionTreeResponse; | |
import com.authzed.api.v1.PermissionService.LookupResourcesRequest; | |
import com.authzed.api.v1.PermissionService.LookupResourcesResponse; | |
import com.authzed.api.v1.PermissionService.LookupSubjectsRequest; | |
import com.authzed.api.v1.PermissionService.LookupSubjectsResponse; | |
import com.authzed.api.v1.PermissionService.ReadRelationshipsRequest; | |
import com.authzed.api.v1.PermissionService.ReadRelationshipsResponse; | |
import com.authzed.api.v1.PermissionService.RelationshipFilter; | |
import com.authzed.api.v1.PermissionService.SubjectFilter; | |
import com.authzed.api.v1.PermissionService.WriteRelationshipsRequest; | |
import com.authzed.api.v1.PermissionService.WriteRelationshipsResponse; | |
import com.authzed.api.v1.PermissionsServiceGrpc; | |
import com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub; | |
import com.authzed.api.v1.SchemaServiceGrpc; | |
import com.authzed.api.v1.SchemaServiceGrpc.SchemaServiceBlockingStub; | |
import com.authzed.api.v1.SchemaServiceOuterClass.ReadSchemaRequest; | |
import com.authzed.api.v1.SchemaServiceOuterClass.ReadSchemaResponse; | |
import com.authzed.api.v1.SchemaServiceOuterClass.WriteSchemaRequest; | |
import com.authzed.api.v1.SchemaServiceOuterClass.WriteSchemaResponse; | |
import com.authzed.grpcutil.BearerToken; | |
import com.google.api.client.util.Preconditions; | |
import com.google.common.base.Stopwatch; | |
import com.newrelic.api.agent.NewRelic; | |
import com.turo.dunlop.platform.commons.metrics.MetricsSupport.TagLabel; | |
import io.grpc.CallOptions; | |
import io.grpc.Channel; | |
import io.grpc.ClientCall; | |
import io.grpc.ClientInterceptor; | |
import io.grpc.ForwardingClientCall; | |
import io.grpc.ForwardingClientCallListener; | |
import io.grpc.ManagedChannel; | |
import io.grpc.ManagedChannelBuilder; | |
import io.grpc.Metadata; | |
import io.grpc.MethodDescriptor; | |
import io.grpc.Status; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.Optional; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.commons.lang3.Range; | |
import org.apache.commons.lang3.tuple.Pair; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.util.StringUtils; | |
/** | |
* This SpiceDB client wrapper implements write/read replica logic for operations that can tolerate | |
* eventually consistent reads. Requests configured with `minimize_latency` are submitted to the | |
* read-only replica instance, if one is configured. | |
* | |
* <p>For the select few operations that require strong consistency the request is always submitted | |
* to the write replica; see the list of such operations at | |
* https://authzed.com/docs/reference/api-consistency. | |
* | |
* <p>This client conflates operations from both {@link com.authzed.api.v1.PermissionsServiceGrpc} | |
* and {@link com.authzed.api.v1.SchemaServiceGrpc}. | |
* | |
* <p>Usage: | |
* | |
* <pre>{@code | |
* final String writeReplicaHostname = "spicedbwrite.com.rr.mu"; | |
* final int writeReplicaPort = 50051; | |
* final String writeReplicaBearerToken = "RDid7HLAXhBTINDfmzO9AFZCgRKHovTqeLKKlhBC"; | |
* | |
* final String readReplicaHostname = "spicedbread.com.rr.mu"; | |
* final int readReplicaPort = 50051; | |
* final String readReplicaBearerToken = "AFZCgRKHovTqeLKKlhBCRDid7HLAXhBTINDfmzO9"; | |
* | |
* final boolean useTls = true; | |
* | |
* final SpiceDbClient spiceDbClient = | |
* new SpiceDbClientImpl.Builder(writeReplicaHostname, writeReplicaPort, writeReplicaBearerToken) | |
* .withReadReplica(readReplicaHostname, readReplicaPort, readReplicaBearerToken) | |
* .withTls(useTls) | |
* .build(); | |
* }</pre> | |
*/ | |
public class SpiceDbClientImpl implements SpiceDbClient { | |
private final PermissionsServiceBlockingStub permissionsWriteReplica; | |
private final Optional<PermissionsServiceBlockingStub> maybePermissionsReadReplica; | |
private final SchemaServiceBlockingStub schemaWriteReplica; | |
private final Optional<SchemaServiceBlockingStub> maybeSchemaReadReplica; | |
public static final long DEFAULT_READ_DEADLINE_IN_MILLIS = 1_500; | |
public static final long DEFAULT_WRITE_DEADLINE_IN_MILLIS = 4_000; | |
public static final String DEFAULT_CLIENT_NAME = "default_client"; | |
public static final Collection<String> WRITE_OPERATIONS = | |
new HashSet<>(Arrays.asList("WriteRelationships", "DeleteRelationships", "WriteSchema")); | |
private SpiceDbClientImpl( | |
final PermissionsServiceBlockingStub permissionsWriteReplica, | |
final SchemaServiceBlockingStub schemaWriteReplica, | |
final Optional<PermissionsServiceBlockingStub> maybePermissionsReadReplica, | |
final Optional<SchemaServiceBlockingStub> maybeSchemaReadReplica) { | |
this.permissionsWriteReplica = permissionsWriteReplica; | |
this.maybePermissionsReadReplica = maybePermissionsReadReplica; | |
this.schemaWriteReplica = schemaWriteReplica; | |
this.maybeSchemaReadReplica = maybeSchemaReadReplica; | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#readRelationships} | |
*/ | |
@Override | |
public Iterator<ReadRelationshipsResponse> readRelationships( | |
final ReadRelationshipsRequest request) { | |
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency()) | |
.readRelationships(request); | |
} | |
/** | |
* Create a {@link com.authzed.api.v1.PermissionService.WriteRelationshipsRequest} based on the | |
* provided resource, relation, and subject. | |
*/ | |
@Override | |
public WriteRelationshipsRequest buildWriteRelationshipsRequest( | |
final String resourceType, | |
final String resourceId, | |
final String relation, | |
final String subjectType, | |
final String subjectId) { | |
return WriteRelationshipsRequest.newBuilder() | |
.addUpdates( | |
RelationshipUpdate.newBuilder() | |
.setOperation(Operation.OPERATION_TOUCH) | |
.setRelationship( | |
Relationship.newBuilder() | |
.setResource( | |
ObjectReference.newBuilder() | |
.setObjectType(resourceType) | |
.setObjectId(resourceId) | |
.build()) | |
.setRelation(relation) | |
.setSubject( | |
SubjectReference.newBuilder() | |
.setObject( | |
ObjectReference.newBuilder() | |
.setObjectType(subjectType) | |
.setObjectId(subjectId) | |
.build()) | |
.build()) | |
.build()) | |
.build()) | |
.build(); | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#writeRelationships} | |
* | |
* @implNote This operation is always submitted to the write replica. | |
*/ | |
@Override | |
public WriteRelationshipsResponse writeRelationships(final WriteRelationshipsRequest request) { | |
return permissionsWriteReplica.writeRelationships(request); | |
} | |
/** | |
* Create a {@link com.authzed.api.v1.PermissionService.DeleteRelationshipsRequest} based on the | |
* provided resource, relation, and subject. | |
*/ | |
@Override | |
public DeleteRelationshipsRequest buildDeleteRelationshipsRequest( | |
final String resourceType, | |
final String resourceId, | |
final String relation, | |
final String subjectType, | |
final String subjectId) { | |
final RelationshipFilter.Builder relationshipFilterBuilder = | |
RelationshipFilter.newBuilder().setResourceType(resourceType); | |
if (!StringUtils.isEmpty(resourceId)) { | |
relationshipFilterBuilder.setOptionalResourceId(resourceId); | |
} | |
if (!StringUtils.isEmpty(relation)) { | |
relationshipFilterBuilder.setOptionalRelation(relation); | |
} | |
if (!StringUtils.isEmpty(subjectType) || !StringUtils.isEmpty(subjectId)) { | |
final SubjectFilter.Builder subjectFilterBuilder = SubjectFilter.newBuilder(); | |
if (!StringUtils.isEmpty(subjectType)) { | |
subjectFilterBuilder.setSubjectType(subjectType); | |
} | |
if (!StringUtils.isEmpty(subjectId)) { | |
subjectFilterBuilder.setOptionalSubjectId(subjectId); | |
} | |
relationshipFilterBuilder.setOptionalSubjectFilter(subjectFilterBuilder); | |
} | |
return DeleteRelationshipsRequest.newBuilder() | |
.setRelationshipFilter(relationshipFilterBuilder) | |
.build(); | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#deleteRelationships} | |
* | |
* @implNote This operation is always submitted to the write replica. | |
*/ | |
@Override | |
public DeleteRelationshipsResponse deleteRelationships(final DeleteRelationshipsRequest request) { | |
return permissionsWriteReplica.deleteRelationships(request); | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#checkPermission} | |
*/ | |
@Override | |
public CheckPermissionResponse checkPermission(final CheckPermissionRequest request) { | |
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency()) | |
.checkPermission(request); | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#expandPermissionTree} | |
*/ | |
@Override | |
public ExpandPermissionTreeResponse expandPermissionTree( | |
final ExpandPermissionTreeRequest request) { | |
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency()) | |
.expandPermissionTree(request); | |
} | |
/** | |
* Create a {@link com.authzed.api.v1.PermissionService.LookupResourcesRequest} based on the | |
* provided resource, permission, and subject. | |
*/ | |
@Override | |
public LookupResourcesRequest buildLookupResourcesRequest( | |
final String resourceType, | |
final String permission, | |
final String subjectType, | |
final String subjectId) { | |
return LookupResourcesRequest.newBuilder() | |
.setResourceObjectType(resourceType) | |
.setPermission(permission) | |
.setSubject( | |
SubjectReference.newBuilder() | |
.setObject( | |
ObjectReference.newBuilder() | |
.setObjectType(subjectType) | |
.setObjectId(subjectId) | |
.build())) | |
.build(); | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#lookupResources} | |
*/ | |
@Override | |
public Iterator<LookupResourcesResponse> lookupResources(final LookupResourcesRequest request) { | |
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency()) | |
.lookupResources(request); | |
} | |
/** | |
* Create a {@link com.authzed.api.v1.PermissionService.LookupSubjectsRequest} based on the | |
* provided subject, permission, and resource. | |
*/ | |
@Override | |
public LookupSubjectsRequest buildLookupSubjectsRequest( | |
final String subjectType, | |
final String permission, | |
final String resourceType, | |
final String resourceId) { | |
return LookupSubjectsRequest.newBuilder() | |
.setSubjectObjectType(subjectType) | |
.setPermission(permission) | |
.setResource( | |
ObjectReference.newBuilder() | |
.setObjectType(resourceType) | |
.setObjectId(resourceId) | |
.build()) | |
.build(); | |
} | |
/** | |
* Wrapper for {@link | |
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#lookupSubjects} | |
*/ | |
@Override | |
public Iterator<LookupSubjectsResponse> lookupSubjects(final LookupSubjectsRequest request) { | |
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency()) | |
.lookupSubjects(request); | |
} | |
/** | |
* Wrapper for {@link com.authzed.api.v1.SchemaServiceGrpc.SchemaServiceBlockingStub#readSchema} | |
* | |
* @implNote This operation is always submitted to the write replica. | |
*/ | |
@Override | |
public ReadSchemaResponse readSchema(final ReadSchemaRequest request) { | |
return schemaWriteReplica.readSchema(request); | |
} | |
/** | |
* Wrapper for {@link com.authzed.api.v1.SchemaServiceGrpc.SchemaServiceBlockingStub#writeSchema} | |
* | |
* @implNote This operation is always submitted to the write replica. | |
*/ | |
@Override | |
public WriteSchemaResponse writeSchema(final WriteSchemaRequest request) { | |
return schemaWriteReplica.writeSchema(request); | |
} | |
/** | |
* Getter for the underlying instance of PermissionsServiceBlockingStub for the write replica. | |
* | |
* <p>Use this in case there is an operation that is not yet implemented in this wrapper. | |
*/ | |
@Override | |
public PermissionsServiceBlockingStub getPermissionsWriteReplicaClient() { | |
return this.permissionsWriteReplica; | |
} | |
/** | |
* Getter for the underlying instance of PermissionsServiceBlockingStub for the read replica. | |
* | |
* <p>Use this in case there is an operation that is not yet implemented in this wrapper. | |
*/ | |
@Override | |
public Optional<PermissionsServiceBlockingStub> getPermissionsReadReplicaClient() { | |
return this.maybePermissionsReadReplica; | |
} | |
/** | |
* Getter for the underlying instance of SchemaServiceBlockingStub for the write replica. | |
* | |
* <p>Use this in case there is an operation that is not yet implemented in this wrapper. | |
*/ | |
@Override | |
public SchemaServiceBlockingStub getSchemaWriteReplicaClient() { | |
return this.schemaWriteReplica; | |
} | |
/** | |
* Getter for the underlying instance of SchemaServiceBlockingStub for the read replica. | |
* | |
* <p>Use this in case there is an operation that is not yet implemented in this wrapper. | |
*/ | |
@Override | |
public Optional<SchemaServiceBlockingStub> getSchemaReadReplica() { | |
return this.maybeSchemaReadReplica; | |
} | |
/** | |
* Helper method to determine which replica should handle the request based on its consistency. | |
* | |
* @return PermissionsServiceBlockingStub that points to the appropriate replica instance. | |
*/ | |
private PermissionsServiceBlockingStub getPermissionsServiceReplicaForRequestConsistency( | |
final Consistency consistency) { | |
return Optional.ofNullable(consistency) | |
.filter(c -> c.hasMinimizeLatency() && c.getMinimizeLatency()) | |
.flatMap(__ -> maybePermissionsReadReplica) | |
.orElse(permissionsWriteReplica); | |
} | |
/** | |
* A GRPC client interceptor that sets sensible defaults for SpiceDB clients and publishes metrics | |
* for latency and errors per operation. | |
* | |
* <p>Inspired by https://techdozo.dev/grpc-interceptor-unary-interceptor-with-code-example/ | |
*/ | |
static class SpiceDbGrpcClientInterceptor implements ClientInterceptor { | |
private final String uniqueName; | |
private final boolean isMetricsEnabled; | |
private final long readDeadlineInMillis; | |
private final long writeDeadlineInMillis; | |
private static final Logger log = LoggerFactory.getLogger(SpiceDbClientImpl.class); | |
public SpiceDbGrpcClientInterceptor( | |
final String uniqueName, | |
final boolean isMetricsEnabled, | |
final long readDeadlineInMillis, | |
final long writeDeadlineInMillis) { | |
super(); | |
this.uniqueName = uniqueName; | |
this.isMetricsEnabled = isMetricsEnabled; | |
this.readDeadlineInMillis = readDeadlineInMillis; | |
this.writeDeadlineInMillis = writeDeadlineInMillis; | |
} | |
@Override | |
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | |
final MethodDescriptor<ReqT, RespT> method, | |
final CallOptions callOptions, | |
final Channel next) { | |
final String operation = method.getBareMethodName(); | |
final Stopwatch stopwatch = Stopwatch.createStarted(); | |
final long deadline = | |
SpiceDbClientImpl.WRITE_OPERATIONS.contains(operation) | |
? writeDeadlineInMillis | |
: readDeadlineInMillis; | |
final CallOptions callOptionsWithDeadline = | |
callOptions.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); | |
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>( | |
next.newCall(method, callOptionsWithDeadline)) { | |
@Override | |
public void start( | |
final ClientCall.Listener<RespT> responseListener, final Metadata headers) { | |
super.start( | |
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>( | |
responseListener) { | |
@Override | |
public void onClose(final Status status, final Metadata trailers) { | |
stopwatch.stop(); | |
final String statusCode = status.getCode().toString(); | |
final Long latency = stopwatch.elapsed(TimeUnit.MILLISECONDS); | |
final Optional<String> maybeReason = Optional.ofNullable(status.getDescription()); | |
final Optional<String> maybeCause = | |
Optional.ofNullable(status.getCause()).map(Throwable::toString); | |
log.debug( | |
"SpiceDbClient ({}): Operation: {} Status: {} Latency: {} ms. Reason: {}." | |
+ " Exception: {}.", | |
uniqueName, | |
operation, | |
statusCode, | |
latency, | |
maybeReason.orElse(null), | |
maybeCause.orElse(null)); | |
recordCustomEvent( | |
uniqueName, operation, statusCode, latency, maybeReason, maybeCause); | |
super.onClose(status, trailers); | |
} | |
}, | |
headers); | |
} | |
}; | |
} | |
private void recordCustomEvent( | |
final String uniqueName, | |
final String operation, | |
final String statusCode, | |
final Long elapsedTime, | |
final Optional<String> maybeReason, | |
final Optional<String> maybeCause) { | |
if (!isMetricsEnabled) { | |
return; | |
} | |
final Map<String, String> attributes = new HashMap<>(); | |
attributes.put(TagLabel.NAME.getKey(), uniqueName); | |
attributes.put(TagLabel.OPERATION.getKey(), operation); | |
attributes.put(TagLabel.RESULT.getKey(), statusCode); | |
attributes.put(TagLabel.ELAPSED_TIME.getKey(), String.valueOf(elapsedTime)); | |
maybeReason.ifPresent(reason -> attributes.put(TagLabel.REASON.getKey(), reason)); | |
maybeCause.ifPresent(cause -> attributes.put(TagLabel.FAILURE_CAUSE.getKey(), cause)); | |
NewRelic.getAgent().getInsights().recordCustomEvent("spicedb_client_metrics", attributes); | |
} | |
} | |
/** | |
* Use this class to build a SpiceDbClient. At a minimum the hostname, port and bearer token for | |
* the write replica must be provided. | |
* | |
* <p>If a read replica is available then use the {@link Builder#withReadReplica} method to | |
* configure it. | |
* | |
* <p>If the SpiceDB instance uses SSL/TLS encryption then use the {@link Builder#withTls} to | |
* configure the client instance. Note that this setting defaults to plain text. | |
*/ | |
public static class Builder { | |
private final String writeReplicaHostname; | |
private final int writeReplicaPort; | |
private final String writeReplicaBearerToken; | |
private Optional<String> maybeReadReplicaHostname = Optional.empty(); | |
private Optional<Integer> maybeReadReplicaPort = Optional.empty(); | |
private Optional<String> maybeReadReplicaBearerToken = Optional.empty(); | |
private Optional<String> maybeUniqueNameForMetrics = Optional.empty(); | |
private Optional<Long> maybeReadDeadlineInMillis = Optional.empty(); | |
private Optional<Long> maybeWriteDeadlineInMillis = Optional.empty(); | |
private boolean isTlsEnabled; | |
public Builder( | |
final String writeReplicaHostname, | |
final int writeReplicaPort, | |
final String writeReplicaBearerToken) { | |
this.writeReplicaHostname = writeReplicaHostname; | |
this.writeReplicaPort = writeReplicaPort; | |
this.writeReplicaBearerToken = writeReplicaBearerToken; | |
} | |
/** | |
* Only use this if your SpiceDb instance is self-hosted and has more than one read replica. | |
* | |
* @param readReplicaHostname The fully qualified domain name (FQDN) for the server | |
* @param readReplicaPort The TCP port for the server | |
* @param readReplicaBearerToken The bearer token for the replica | |
*/ | |
public SpiceDbClientImpl.Builder withReadReplica( | |
final String readReplicaHostname, | |
final int readReplicaPort, | |
final String readReplicaBearerToken) { | |
this.maybeReadReplicaHostname = Optional.of(readReplicaHostname); | |
this.maybeReadReplicaPort = Optional.of(readReplicaPort); | |
this.maybeReadReplicaBearerToken = Optional.of(readReplicaBearerToken); | |
return this; | |
} | |
/** | |
* Set this to true if your SpiceDB instance uses TLS encryption. Defaults to false (plaintext). | |
* | |
* <p>Note: TLS is required when using AuthZed SaaS instances. | |
*/ | |
public SpiceDbClientImpl.Builder withTls(final boolean isEnabled) { | |
this.isTlsEnabled = isEnabled; | |
return this; | |
} | |
/** | |
* Set this to a unique name for your permissions system (aka spicedb tenant) for which metrics | |
* will be published. Examples: "hosting_team", "payments, "gearbox", etc. | |
* | |
* <p>The SpiceDbClient will not publish metrics by default if built without this option. | |
*/ | |
public SpiceDbClientImpl.Builder withMetrics(final String uniqueName) { | |
this.maybeUniqueNameForMetrics = Optional.of(uniqueName); | |
return this; | |
} | |
/** | |
* The maximum amount of time each read request to SpiceDB should wait for a response. | |
* | |
* <p>If not set it defaults to the value in {@link | |
* SpiceDbClientImpl#DEFAULT_READ_DEADLINE_IN_MILLIS}. | |
*/ | |
public SpiceDbClientImpl.Builder withReadDeadline(final long timeInMillis) { | |
this.maybeReadDeadlineInMillis = Optional.of(timeInMillis); | |
return this; | |
} | |
/** | |
* The maximum amount of time each write request to SpiceDB should wait for a response. | |
* | |
* <p>If not set it defaults to the value in {@link | |
* SpiceDbClientImpl#DEFAULT_WRITE_DEADLINE_IN_MILLIS}. | |
*/ | |
public SpiceDbClientImpl.Builder withWriteDeadline(final long timeInMillis) { | |
this.maybeWriteDeadlineInMillis = Optional.of(timeInMillis); | |
return this; | |
} | |
public SpiceDbClient build() { | |
// Left = Permissions Service client, Right = Schema Service client | |
final Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub> writeReplicaClients = | |
validateAndBuildWriteReplicaClients(); | |
final Optional<Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub>> | |
readReplicaClients = maybeValidateAndBuildReadReplicaClients(); | |
return new SpiceDbClientImpl( | |
writeReplicaClients.getLeft(), | |
writeReplicaClients.getRight(), | |
readReplicaClients.map(Pair::getLeft), | |
readReplicaClients.map(Pair::getRight)); | |
} | |
private Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub> | |
validateAndBuildWriteReplicaClients() { | |
// Ensure all required parameters for the write replica are valid. | |
Preconditions.checkArgument( | |
writeReplicaHostname != null, "writeReplicaHostname cannot be null"); | |
Preconditions.checkArgument( | |
Range.between(1, 65_535).contains(writeReplicaPort), "writeReplicaPort is invalid"); | |
Preconditions.checkArgument( | |
writeReplicaBearerToken != null, "writeReplicaBearerToken cannot be null"); | |
// Create a single shared long-lived channel for both Permissions and Schema write replicas | |
final ManagedChannel writeReplicaChannel = | |
buildManagedChannel(writeReplicaHostname, writeReplicaPort, isTlsEnabled); | |
final PermissionsServiceBlockingStub permissionsServiceWriteReplica = | |
PermissionsServiceGrpc.newBlockingStub(writeReplicaChannel) | |
.withCallCredentials(new BearerToken(writeReplicaBearerToken)); | |
final SchemaServiceGrpc.SchemaServiceBlockingStub schemaServiceWriteReplica = | |
SchemaServiceGrpc.newBlockingStub(writeReplicaChannel) | |
.withCallCredentials(new BearerToken(writeReplicaBearerToken)); | |
return Pair.of(permissionsServiceWriteReplica, schemaServiceWriteReplica); | |
} | |
private Optional<Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub>> | |
maybeValidateAndBuildReadReplicaClients() { | |
// If any of the read replica parameters are present then ensure they are all present and | |
// valid before building Grpc Stubs. | |
if (maybeReadReplicaHostname.isPresent() | |
|| maybeReadReplicaPort.isPresent() | |
|| maybeReadReplicaBearerToken.isPresent()) { | |
Preconditions.checkArgument( | |
maybeReadReplicaHostname.isPresent(), "readReplicaHostname is invalid or missing"); | |
Preconditions.checkArgument( | |
maybeReadReplicaPort | |
.filter(port -> Range.between(1, 65_535).contains(port)) | |
.isPresent(), | |
"readReplicaPort is invalid or missing"); | |
Preconditions.checkArgument( | |
maybeReadReplicaBearerToken.isPresent(), | |
"readReplicaBearerToken is invalid is missing"); | |
} | |
return maybeReadReplicaHostname.flatMap( | |
readReplicaHostName -> | |
maybeReadReplicaPort.flatMap( | |
readReplicaPort -> | |
maybeReadReplicaBearerToken.map( | |
readReplicaBearerToken -> { | |
// Create a single shared long-lived channel for both Permissions and | |
// Schema read replicas | |
final ManagedChannel readReplicaChannel = | |
buildManagedChannel( | |
readReplicaHostName, readReplicaPort, isTlsEnabled); | |
final PermissionsServiceBlockingStub permissionsServiceReadReplica = | |
PermissionsServiceGrpc.newBlockingStub(readReplicaChannel) | |
.withCallCredentials(new BearerToken(readReplicaBearerToken)); | |
final SchemaServiceBlockingStub schemaServiceReadReplica = | |
SchemaServiceGrpc.newBlockingStub(readReplicaChannel) | |
.withCallCredentials(new BearerToken(readReplicaBearerToken)); | |
return Pair.of(permissionsServiceReadReplica, schemaServiceReadReplica); | |
}))); | |
} | |
private ManagedChannel buildManagedChannel( | |
final String hostname, final int port, final boolean useTransportSecurity) { | |
ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(hostname, port); | |
if (useTransportSecurity) { | |
channelBuilder = channelBuilder.useTransportSecurity(); | |
} else { | |
channelBuilder = channelBuilder.usePlaintext(); | |
} | |
final String uniqueName = | |
maybeUniqueNameForMetrics.orElse(SpiceDbClientImpl.DEFAULT_CLIENT_NAME); | |
// will only publish metrics if a unique name for the client is provided. | |
final boolean isMetricsEnabled = maybeUniqueNameForMetrics.isPresent(); | |
final long readDeadline = | |
this.maybeReadDeadlineInMillis.orElse(SpiceDbClientImpl.DEFAULT_READ_DEADLINE_IN_MILLIS); | |
final long writeDeadline = | |
this.maybeWriteDeadlineInMillis.orElse( | |
SpiceDbClientImpl.DEFAULT_WRITE_DEADLINE_IN_MILLIS); | |
channelBuilder = | |
channelBuilder.intercept( | |
new SpiceDbGrpcClientInterceptor( | |
uniqueName, isMetricsEnabled, readDeadline, writeDeadline)); | |
return channelBuilder.build(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment