Last active
December 12, 2023 02:29
-
-
Save SpareShade/bcbd9b53ab99f3f4227412eefe9a04b7 to your computer and use it in GitHub Desktop.
GetUp - Artwork - Register Editioned Artwork
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package main is the entry point for the Bleyk App V1. | |
package main | |
import ( | |
// Import necessary packages | |
"context" | |
"flag" | |
"fmt" | |
"log" | |
"os" | |
"time" | |
// Import necessary packages | |
echomiddleware "github.com/labstack/echo/v4/middleware" | |
"go.uber.org/zap" | |
"go.uber.org/zap/zapcore" | |
"golang.org/x/net/http2" | |
"google.golang.org/grpc" | |
"gopkg.in/yaml.v2" | |
_ "github.com/lib/pq" | |
"github.com/nats-io/nats.go" | |
// Internal packages | |
pgeventstore "bleyk.org/pkg/domain/postgres" | |
appgrpc "bleyk.org/pkg/grpc" | |
grpcmiddleware "bleyk.org/pkg/grpc/middleware" | |
httputils "bleyk.org/pkg/http" | |
httpmiddleware "bleyk.org/pkg/http/middleware" | |
httpmiddleware_blyekadmin_identity "bleyk.org/pkg/http/middleware/bleykadmin/identity" | |
httpmiddleware_identity "bleyk.org/pkg/http/middleware/identity" | |
integration_pg "bleyk.org/pkg/integration/postgres" | |
"bleyk.org/pkg/logger" | |
iam "bleyk.org/services/iam" | |
pkg_aws "bleyk.org/pkg/aws" | |
pkg_cookie "bleyk.org/pkg/http/cookie" | |
"bleyk.org/pkg/postgres" | |
service_artwork "bleyk.org/services/artwork" | |
// other services | |
) | |
const ( | |
VERSION = "v0.5" | |
) | |
// NOTE: `config` for local dev via local file | |
// AWS ParamsStore `config` for sensitive details must be used in production | |
// let's start this thing | |
func main() { | |
ctx := context.Background() | |
envirionment := os.Getenv("ENVIRONMENT") | |
// ... (config setup) | |
// App Logging | |
appLogLevel := parseLogLevelFromString(cfg.Logging.Application.LogLevel) | |
appLogger := logger.NewLogger(logger.NewLumberjackWriter(logger.LogConfig{ | |
LogFilename: cfg.Logging.Application.LogFilename, | |
MaxBackups: cfg.Logging.Application.MaxBackups, | |
MaxSizeMB: cfg.Logging.Application.MaxSizeMB, | |
MaxAgeDays: cfg.Logging.Application.MaxAgeDays, | |
}), appLogLevel) | |
defer appLogger.Sync() | |
appLogger = appLogger.Named("Bleyk") | |
appLogger.Info(ctx, "Starting "+VERSION, | |
zap.String("env", envirionment), | |
) | |
// infra logger | |
infraLogger := appLogger.Named("Infrastructure") | |
// *********************************************************************** | |
// ************* RDS PgDB ************************************************ | |
// *********************************************************************** | |
pgLogLevel := parseLogLevelFromString(cfg.Logging.Database.LogLevel) | |
pgxLogger := logger.NewLogger(logger.NewLumberjackWriter(logger.LogConfig{ | |
LogFilename: cfg.Logging.Database.LogFilename, | |
MaxBackups: cfg.Logging.Database.MaxBackups, | |
MaxSizeMB: cfg.Logging.Database.MaxSizeMB, | |
MaxAgeDays: cfg.Logging.Database.MaxAgeDays, | |
}), pgLogLevel) | |
defer pgxLogger.Sync() | |
// RDS role-based PG | |
creds := awsClients.Config(&cfg.AWS.Region, &cfg.AWS.AccountID, &cfg.AWS.IAM.AssumeRole).Credentials | |
pgxPool, err := postgres.NewRDSIamPgxConn(creds, cfg.AWS.RDS, infraLogger.Named("Postgres"), pgxLogger) | |
if err != nil { | |
appLogger.Fatal(ctx, "[Postgres]: "+err.Error()) | |
} | |
defer pgxPool.Close() | |
// ******************************************************************************** | |
// ************* AWS Services ************************************************* | |
// ******************************************************************************** | |
s3Client := awsClients.S3Client(&cfg.AWS.Region, &cfg.AWS.AccountID, &cfg.AWS.IAM.AssumeRole) | |
sqsClient := awsClients.SQSClient(&cfg.AWS.Region, &cfg.AWS.AccountID, &cfg.AWS.IAM.AssumeRole) | |
cognitoClient := awsClients.CognitoClient(&cfg.AWS.Region, &cfg.AWS.AccountID, &cfg.AWS.IAM.AssumeRole) | |
lambdaClient := awsClients.LambdaClient(&cfg.AWS.Region, &cfg.AWS.AccountID, &cfg.AWS.IAM.AssumeRole) | |
// ******************************************************************************** | |
// ************* NATS (JetStream) ************************************************* | |
// ******************************************************************************** | |
natsLogger := infraLogger.Named("NATS") | |
// Connect to NATS server | |
natsOpts := []nats.Option{ | |
nats.DisconnectHandler(func(nc *nats.Conn) { | |
natsLogger.Error(ctx, "NATS disconnected") | |
}), | |
nats.ReconnectHandler(func(nc *nats.Conn) { | |
natsLogger.Debug(ctx, "NATS reconnected") | |
}), | |
} | |
nc, err := nats.Connect(cfg.NATS.URL, natsOpts...) | |
if err != nil { | |
panic(err) | |
} | |
jetstream, err := nc.JetStream() | |
if err != nil { | |
appLogger.Fatal(ctx, err.Error()) | |
} | |
// Integration events stream | |
// NB. Bleyk's integration event naming convention is: | |
// bleyk.<service>.integration.events.<event_name> | |
// see "Proto docs" for more info | |
_, err = jetstream.AddStream(&nats.StreamConfig{ | |
Name: cfg.NATS.JetStream.Name, | |
Subjects: []string{ | |
// listen to IAM integration events | |
"bleyk.iam.integration.>", | |
// listen to services.{service} integration events | |
"bleyk.services.*.integration.>", | |
// listen to `admin` integration events | |
"bleyk.admin.integration.>", | |
}, | |
Description: "Stream for processing integration events.", | |
Duplicates: time.Duration(3 * time.Minute), | |
Storage: nats.FileStorage, | |
}) | |
if err != nil { | |
appLogger.Fatal(ctx, err.Error()) | |
} | |
// ******************************************************************************** | |
// ************* gRPC ************************************************* | |
// ******************************************************************************** | |
// Currently we require 2 gRPC servers: public api and internal app services. | |
// The request is received via the public api gRPC service, and is passed onto the internal application gRPC service. | |
// NB. we are only logging the request received by the internal application service at this time. | |
// gRPC server for public facing API | |
APIGrpcServer := appgrpc.NewServer( | |
cfg.GRPC.PublicAPI, | |
// infraLogger.Named("gRPC_API"), | |
[]grpc.UnaryServerInterceptor{ | |
grpcmiddleware.SetMetadataFromUnaryRequest(), | |
grpcmiddleware.SetIdentityFromUnaryRequest(), | |
// grpcmiddleware.LogUnaryRequest(logger), | |
}, | |
[]grpc.StreamServerInterceptor{ | |
grpcmiddleware.SetMetadataFromStreamRequest(), | |
grpcmiddleware.SetIdentityFromStreamRequest(), | |
// grpcmiddleware.LogStreamRequest(logger), | |
}, | |
) | |
// internal application services gRPC server | |
grpcAppServerLogger := infraLogger.Named("gRPC.ApplicationService.Server") | |
internalAppServicesGrpcServer := appgrpc.NewServer( | |
cfg.GRPC.PublicAPI, | |
// infraLogger.Named("gRPC.ApplicationService.Server"), | |
[]grpc.UnaryServerInterceptor{ | |
grpcmiddleware.SetMetadataFromUnaryRequest(), | |
grpcmiddleware.SetIdentityFromUnaryRequest(), | |
grpcmiddleware.LogUnaryRequest(grpcAppServerLogger), | |
}, | |
[]grpc.StreamServerInterceptor{ | |
grpcmiddleware.SetMetadataFromStreamRequest(), | |
grpcmiddleware.SetIdentityFromStreamRequest(), | |
grpcmiddleware.LogStreamRequest(grpcAppServerLogger), | |
}, | |
) | |
// gRPC client for internal application services | |
// grpcAppClientLogger := infraLogger.Named("gRPC.ApplicationService.Client") | |
internalAppServicesGrpcConn, err := appgrpc.NewConnection( | |
context.Background(), | |
// cfg.GRPC.InternalApplicationServices.Host, | |
// cfg.GRPC.InternalApplicationServices.Port, | |
cfg.GRPC.InternalApplicationServices, | |
[]grpc.UnaryClientInterceptor{ | |
grpcmiddleware.AppendMetadataToOutgoingUnaryContext(), | |
grpcmiddleware.AppendIdentityToOutgoingUnaryContext(), | |
// grpcmiddleware.LogOutgoingUnaryRequest(grpcAppClientLogger), | |
}, | |
[]grpc.StreamClientInterceptor{ | |
grpcmiddleware.AppendMetadataToOutgoingStreamContext(), | |
grpcmiddleware.AppendIdentityToOutgoingStreamContext(), | |
// grpcmiddleware.LogOutgoingStreamRequest(grpcAppClientLogger), | |
}, | |
) | |
if err != nil { | |
appLogger.Fatal(ctx, "gRPC Client failed to start", zap.String("error", err.Error())) | |
} | |
defer internalAppServicesGrpcConn.Close() | |
// ******************************************************************************** | |
// ************* Domain and Integration Events *********************************** | |
// ******************************************************************************** | |
// integration events store | |
integrationEventsStore := integration_pg.NewEventStore(jetstream, pgxPool, infraLogger.Named("IntegrationEventsStore")) | |
// domain events store | |
domainEventsStore := pgeventstore.NewEventStore(integrationEventsStore, pgxPool, infraLogger.Named("DomainEventsStore")) | |
// ******************************************************************************** | |
// ************* HTTP (Echo) Server ************************************************* | |
// ******************************************************************************** | |
// Our http server (Echo) with base configs and middlewares | |
echoServer := httputils.NewEcho(cfg.HTTP.PublicAPI.Origins, infraLogger.Named("HTTPServer")) | |
// ******************************************************************************** | |
// ************* Infra Services ******************************************* | |
// ******************************************************************************** | |
// var cookieService pkg_cookie.CookieService | |
cookieService := pkg_cookie.NewCookieService(pkg_cookie.CookieConfig{ | |
Name: cfg.Services.IAM.Authentication.Session.Cookie.Name, | |
Domain: cfg.Services.IAM.Authentication.Session.Cookie.Domain, | |
Path: cfg.Services.IAM.Authentication.Session.Cookie.Path, | |
HTTPOnly: cfg.Services.IAM.Authentication.Session.Cookie.HTTPOnly, | |
Secure: cfg.Services.IAM.Authentication.Session.Cookie.Secure, | |
SameSite: cfg.Services.IAM.Authentication.Session.Cookie.SameSite, | |
Duration: cfg.Services.IAM.Authentication.Session.Cookie.Duration, | |
}) | |
// ******************************************************************************** | |
// ************* IAM start ******************************************* | |
// ******************************************************************************** | |
appServicesLogger := appLogger.Named("Service") | |
// init the iam app | |
authClient := iam.Start( | |
ctx, | |
echoServer, | |
APIGrpcServer, | |
internalAppServicesGrpcServer, | |
internalAppServicesGrpcConn, | |
domainEventsStore, | |
cookieService, | |
pgxPool, | |
cognitoClient, | |
&cfg.AWS.Cognito, | |
appServicesLogger.Named("IAM"), | |
) | |
// ******************************************************************************** | |
// ************* Services' start ************************************************* | |
// ******************************************************************************** | |
// Artwork | |
service_artwork.Start( | |
ctx, | |
internalAppServicesGrpcServer, | |
internalAppServicesGrpcConn, | |
APIGrpcServer, | |
domainEventsStore, | |
pgxPool, | |
appServicesLogger.Named("Artwork"), | |
) | |
// ... | |
// Media service | |
err = service_dam.Start( | |
ctx, | |
internalAppServicesGrpcServer, | |
internalAppServicesGrpcConn, | |
APIGrpcServer, | |
domainEventsStore, | |
jetstream, | |
pgxPool, | |
s3Client, | |
sqsClient, | |
lambdaClient, | |
appServicesLogger.Named("DAM"), | |
cfg.Services.DAM, | |
) | |
// NB. all services depend of the media service, | |
// so if this service fails we must fatalise it | |
if err != nil { | |
// log fatal | |
appLogger.Fatal(ctx, "[DAM Service]: failed to start", | |
zap.String("error", err.Error())) | |
} | |
// ... (code for other service start-up) | |
// ******************************************************************************** | |
// ************* App gRPC-web API routes ****************************************** | |
// ******************************************************************************** | |
// The HTTP requests are transformed to gRPC-Web requests by passing them through the WrappedGRPCWebServer middleware; | |
// The routes below route them to the appropriate registered gRPC service | |
// gRPC-Web Registration API router | |
registrationApiRoute := "/registration/api/v1" | |
registrationGrpcWebRouter := echoServer.Group(registrationApiRoute) | |
registrationGrpcWebRouter.Use( | |
echomiddleware.BodyLimit("10K"), | |
// gRPC-Web Request Middleware | |
httpmiddleware.WrappedGRPCWebServer(APIGrpcServer, registrationApiRoute), | |
) | |
// gRPC-Web Data API router | |
dataApiRoute := "/data/api/v1" | |
dataGrpcWebRouter := echoServer.Group(dataApiRoute) | |
dataGrpcWebRouter.Use( | |
echomiddleware.BodyLimit("10K"), | |
// gRPC-Web Request Middleware | |
httpmiddleware.WrappedGRPCWebServer(APIGrpcServer, dataApiRoute), | |
) | |
// gRPC-Web IAM API router | |
iamApiRoute := "/iam/api/v1" | |
iamGrpcWebRouter := echoServer.Group(iamApiRoute) | |
iamGrpcWebRouter.Use( | |
echomiddleware.BodyLimit("10K"), | |
// all requests going further down the chain must have a complete identity (both user and account ids) | |
httpmiddleware_identity.WithIdentityFromCookie(authClient, cookieService), | |
// abort request in we do not have complete identity | |
httpmiddleware_identity.AbortWithoutIdentity(), | |
// gRPC-Web Request Middleware | |
httpmiddleware.WrappedGRPCWebServer(APIGrpcServer, iamApiRoute), | |
) | |
// gRPC-Web Tenant API router | |
tenantGrpcWebApiRoute := "/tenant/api/v1" | |
grpcWebTenantAPIRouter := echoServer.Group(tenantGrpcWebApiRoute) | |
grpcWebTenantAPIRouter.Use( | |
echomiddleware.BodyLimit("1024K"), | |
// all requests going further down the chain must have a complete identity (both user and account ids) | |
httpmiddleware_identity.WithIdentityFromCookie(authClient, cookieService), | |
// abort request in we do not have complete identity | |
httpmiddleware_identity.AbortWithoutIdentity(), | |
// gRPC-Web Request Middleware | |
httpmiddleware.WrappedGRPCWebServer(APIGrpcServer, tenantGrpcWebApiRoute), | |
) | |
// ******************************************************************************** | |
// ************* Start the engines *********************************************** | |
// ******************************************************************************** | |
// Start the API gRPC server | |
go func() { | |
appgrpc.Start( | |
ctx, | |
APIGrpcServer, | |
fmt.Sprintf("%s:%d", cfg.GRPC.PublicAPI.Host, cfg.GRPC.PublicAPI.Port), | |
) | |
}() | |
// Start the Internal Application Services gRPC server | |
go func() { | |
appgrpc.Start( | |
ctx, | |
internalAppServicesGrpcServer, | |
fmt.Sprintf("%s:%d", cfg.GRPC.InternalApplicationServices.Host, cfg.GRPC.InternalApplicationServices.Port), | |
) | |
}() | |
// prep the HTTP/2 server | |
http2s := &http2.Server{ | |
MaxConcurrentStreams: 250, | |
MaxReadFrameSize: 1048576, | |
IdleTimeout: 10 * time.Second, | |
// MaxUploadBufferPerConnection | |
// MaxUploadBufferPerStream | |
} | |
// Start HTTP/2 server | |
httputils.StartEchoHttp2Server( | |
ctx, | |
echoServer, | |
http2s, | |
httputils.Config(cfg.HTTP.PublicAPI), | |
appLogger, | |
) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package artwork | |
import ( | |
// ... removed for brevity | |
) | |
// Start is the entry point of the artwork service | |
func Start( | |
ctx context.Context, | |
internalApplicationServicesGrpcServer *grpc.Server, | |
internalApplicationServicesGrpcConn *grpc.ClientConn, | |
webAPIGrpcServer *grpc.Server, | |
domainEventsStore pkg_domain.EventStore, | |
tenantAuthorisationSrvc pkg_auth.TenantAuhorisation, | |
userAuthorisationSrvc pkg_auth.UserAuhorisation, | |
pgxpool *pgxpool.Pool, | |
logger logger.Logger, | |
) { | |
// ... removed for brevity | |
// Subscribe the transactional handlers for event sourcing | |
domainEventsStore.SubscribeTransactionalEventHandlers( | |
persistence_pg.NewCaptionsProjector(logger), | |
// ... | |
) | |
// Initialize repositories for various aggregates | |
captionRepository := eventsourcing.NewCaptionRepository(domainEventsStore) | |
// ... | |
// ******************************************************************************** | |
// ************* Application Services *************************************** | |
// ******************************************************************************** | |
// Create and register instances of application services | |
artworkAppService := app_artworkv1.NewService( | |
domainEventsStore, | |
captionRepository, | |
// ... | |
logger, | |
) | |
// Compose Artwork Application Service | |
artworkAppService = app_artworkv1.NewUserAuthorisationService( | |
artworkAppService, | |
userAuthorisationSrvc, | |
// ... | |
) | |
artworkAppService = app_artworkv1.NewTenantAuthorisationService( | |
artworkAppService, | |
tenantAuthorisationSrvc, | |
// ... | |
) | |
artworkAppService = app_artworkv1.NewInstrumentingService( | |
artworkAppService, | |
// ... | |
) | |
artworkAppService = app_artworkv1.NewValidationService( | |
artworkAppService, | |
// ... | |
) | |
// Register Artwork Application Service | |
proto_appv1.RegisterArtworkServiceServer(internalApplicationServicesGrpcServer, artworkAppService) | |
// other `Artwork` application services | |
// ******************************************************************************** | |
// ************* gRPC service clients *************************************** | |
// ******************************************************************************** | |
// Create gRPC service clients for communication with other services | |
artworkServiceClient := proto_appv1.NewArtworkServiceClient(internalApplicationServicesGrpcConn) | |
// ******************************************************************************** | |
// ************* Interfaces *************************************** | |
// ******************************************************************************** | |
// Artwork API service | |
artworkAPIService := apiv1.NewArtworkAPIServer(pgxpool, artworkServiceClient) | |
// Register Artwork API as gRPC Server | |
proto_apiv1.RegisterArtworkServiceServer( | |
webAPIGrpcServer, | |
artworkAPIService, | |
) | |
// other `Artwork` public API services | |
// Log service start. | |
logger.Debug(ctx, "Artwork service started") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package v1 provides the implementation of gRPC artwork service version 1. | |
// This package includes a validationService middleware that performs validation checks on requests | |
// for registering editioned and unique artworks before passing them to the next service in the chain. | |
// It utilises the Google gRPC library and custom error details for handling validation errors. | |
// The middleware is designed to be used as part of the artwork service's application layer. | |
package v1 | |
import ( | |
// ... | |
) | |
// validationService acts as a gRPC service middleware for validation in the artwork service | |
type validationService struct { | |
next Service | |
// ... other dependecies | |
proto_appv1.UnimplementedArtworkServiceServer | |
} | |
// NewValidationValidationService creates a new instance of the validationService middleware | |
func NewValidationValidationService( | |
next Service, | |
// ... other dependecies | |
) Service { | |
service := &validationService{ | |
next: next, | |
// ... | |
} | |
return service | |
} | |
// RegisterEditionedArtwork validates and processes registration of editioned artwork | |
func (s *validationService) RegisterEditionedArtwork(ctx context.Context, | |
req *proto_appv1.RegisterEditionedArtworkRequest, | |
) (*proto_appv1.RegisterEditionedArtworkResponse, error) { | |
var err error | |
_, ok := identity.FromContext(ctx) | |
if !ok { | |
return nil, status.Error(codes.Unauthenticated, "missing tenant identity") | |
} | |
fieldViolations := []*errdetails.BadRequest_FieldViolation{} | |
// Require artist ID | |
if len(req.GetArtistId()) < 8 { | |
fieldViolations = append(fieldViolations, &errdetails.BadRequest_FieldViolation{ | |
Field: "Artist", | |
Description: "Missing artist.", | |
}) | |
} | |
// Validate correctness of artist ID | |
if !s.artistSrvc.IsValidArtist(ctx, req.GetArtistId()) { | |
fieldViolations = append(fieldViolations, &errdetails.BadRequest_FieldViolation{ | |
Field: "Artist", | |
Description: "Invalid artist selected.", | |
}) | |
} | |
// Require medium type | |
if len(req.GetMediumTypeId()) < 2 { | |
fieldViolations = append(fieldViolations, &errdetails.BadRequest_FieldViolation{ | |
Field: "Medium type", | |
Description: "Missing medium type.", | |
}) | |
} | |
// Validate correctness of medium type ID | |
if !s.mediumTypeSrvc.IsValidMediumType(ctx, req.GetMediumTypeId()) { | |
fieldViolations = append(fieldViolations, &errdetails.BadRequest_FieldViolation{ | |
Field: "Medium type", | |
Description: "Invalid medium type.", | |
}) | |
} | |
// Edition specs | |
if req.GetEditionsSpecification() == nil { | |
fieldViolations = append(fieldViolations, &errdetails.BadRequest_FieldViolation{ | |
Field: "Edition specification", | |
Description: "Missing edition specification. Please enter editions details.", | |
}) | |
} else { | |
// get field descriptors | |
fd := req.GetEditionsSpecification().ProtoReflect().Descriptor().Fields() | |
allZero := true | |
for i := 0; i < fd.Len(); i++ { | |
r := req.GetEditionsSpecification().ProtoReflect().Get(fd.Get(i)) | |
// each field must be an `int32`, at least one must be > 0 | |
n, ok := r.Interface().(int32) | |
if ok { | |
if n > 0 { | |
allZero = false | |
} | |
} | |
} | |
if allZero { | |
fieldViolations = append(fieldViolations, &errdetails.BadRequest_FieldViolation{ | |
Field: "Edition specification", | |
Description: "Missing edition specification. Please enter editions details for at least one edition type.", | |
}) | |
} | |
} | |
// ... other validations | |
if len(fieldViolations) > 0 { | |
st := status.New(codes.InvalidArgument, "") | |
st, _ = st.WithDetails(&errdetails.BadRequest{ | |
FieldViolations: fieldViolations, | |
}) | |
return nil, st.Err() | |
} | |
return s.next.RegisterEditionedArtwork(ctx, req) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// package v1 contains the implementation of the artwork service (application layer) with instrumentation. | |
package v1 | |
import ( | |
"context" | |
"time" | |
proto_appv1 "bleyk.org/genproto/services/artwork/application/v1" | |
"github.com/go-kit/kit/metrics" | |
) | |
// instrumentingService is a struct that wraps an artwork service with instrumentation. | |
type instrumentingService struct { | |
requestCount metrics.Counter // Counter to track the number of requests. | |
requestLatency metrics.Histogram // Histogram to measure the latency of requests. | |
next Service // The next service in the chain. | |
proto_appv1.UnimplementedArtworkServiceServer | |
} | |
// NewInstrumentingService creates a new instrumenting service with the given counter, latency histogram, and the next service. | |
func NewInstrumentingService(counter metrics.Counter, latency metrics.Histogram, s Service) Service { | |
return &instrumentingService{ | |
requestCount: counter, | |
requestLatency: latency, | |
next: s, | |
} | |
} | |
// RegisterEditionedArtwork is a method of the instrumentingService that intercepts and instruments the RegisterEditionedArtwork method. | |
func (s *instrumentingService) RegisterEditionedArtwork(ctx context.Context, | |
req *proto_appv1.RegisterEditionedArtworkRequest, | |
) (*proto_appv1.RegisterEditionedArtworkResponse, error) { | |
// Measure the time taken to execute the RegisterEditionedArtwork method. | |
defer func(begin time.Time) { | |
// Increment the request count metric. | |
s.requestCount.With("method", "RegisterArtist").Add(1) | |
// Observe the request latency in seconds. | |
s.requestLatency.With("method", "RegisterArtist").Observe(time.Since(begin).Seconds()) | |
}(time.Now()) | |
// Call the RegisterEditionedArtwork method of the next service in the chain. | |
return s.next.RegisterEditionedArtwork(ctx, req) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package v1 provides the implementation of gRPC artwork service version 1. | |
// This package includes the main service implementation that handles requests related to registering editioned artwork. | |
// It utilises the Google gRPC library and interacts with various domain aggregates for handling business logic. | |
// The service is designed to be part of the artwork service's application layer. | |
package v1 | |
import ( | |
// ... | |
) | |
type Service interface { | |
proto_appv1.ArtworkServiceServer | |
} | |
// NewService creates a new instance of the artwork service. | |
// It takes various dependencies such as an event store, caption repository, logger, etc., as parameters. | |
func NewService( | |
eventStore pkg_domain.EventStore, | |
captionRepository domain_caption.Repository, | |
// ... | |
logger logger.Logger, | |
) Service { | |
return &service{ | |
eventStore: eventStore, | |
captionRepository: captionRepository, | |
// ... | |
logger: logger, | |
} | |
} | |
// service is the main implementation of the artwork service. | |
type service struct { | |
eventStore pkg_domain.EventStore | |
captionRepository domain_caption.Repository | |
// ... | |
logger logger.Logger | |
proto_appv1.UnimplementedArtworkServiceServer | |
} | |
// RegisterEditionedArtwork is a method of the artwork service that handles the registration of editioned artworks. | |
// It performs various business logic validations, interacts with domain aggregates, and persists data. | |
func (s *service) RegisterEditionedArtwork(ctx context.Context, | |
req *proto_appv1.RegisterEditionedArtworkRequest, | |
) (*proto_appv1.RegisterEditionedArtworkResponse, error) { | |
// Prepare editions to register based on the request. | |
editionsToRegister := s.prepareEditionsToRegister(req) | |
// Open store session before saving/committing. | |
esSession := s.eventStore.NewStoreSession() | |
var ( | |
accountID = req.GetAccountId() | |
userID = req.GetUserId() | |
artworkID = identifier.Generate() | |
err error | |
) | |
// Artwork record | |
recordAggregate := domain_record.New() | |
registeredEditions := []*proto_rsrcv1.EditionDescriptor{} | |
for _, v := range editionsToRegister { | |
registeredEditions = append(registeredEditions, &proto_rsrcv1.EditionDescriptor{ | |
EditionId: v.editionID, | |
Type: v.editionType, | |
Number: v.editionNumber, | |
}) | |
} | |
// Artwork record | |
err = recordAggregate.RegisterEditionedArtwork( | |
ctx, | |
&proto_dmn_recordv1.RegisterEditionedArtwork{ | |
AccountId: accountID, | |
UserId: userID, | |
ArtworkId: artworkID, | |
ArtistId: req.GetArtistId(), | |
ArtworkReferenceId: req.GetArtworkReferenceId(), | |
IsOpenEdition: req.GetIsOpenEdition(), | |
EditionsSpecification: req.GetEditionsSpecification(), | |
RegisteredEditions: registeredEditions, | |
}, | |
) | |
if err != nil { | |
s.logger.Error(ctx, err.Error()) | |
return nil, status.Errorf(codes.Internal, err.Error()) | |
} | |
// Save the record aggregate to the repository. | |
err = s.recordRepository.Save(ctx, recordAggregate, esSession) | |
if err != nil { | |
s.logger.Error(ctx, err.Error()) | |
return nil, status.Errorf(codes.Internal, err.Error()) | |
} | |
// Caption aggregate | |
captionAggregate := domain_caption.New() | |
editionsWithCaption := []*proto_dmn_captionv1.RegisterEditionedArtworkWithCaption_Edition{} | |
for _, v := range editionsToRegister { | |
editionsWithCaption = append(editionsWithCaption, &proto_dmn_captionv1.RegisterEditionedArtworkWithCaption_Edition{ | |
EditionId: v.editionID, | |
Type: v.editionType, | |
Number: v.editionNumber, | |
Caption: v.caption, | |
}) | |
} | |
err = captionAggregate.RegisterEditionedArtworkWithCaption( | |
ctx, | |
&proto_dmn_captionv1.RegisterEditionedArtworkWithCaption{ | |
AccountId: accountID, | |
UserId: userID, | |
ArtworkId: artworkID, | |
Editions: editionsWithCaption, | |
}, | |
) | |
if err != nil { | |
s.logger.Error(ctx, err.Error()) | |
return nil, status.Errorf(codes.Internal, err.Error()) | |
} | |
// Save the `Caption` aggregate to the repository. | |
err = s.captionRepository.Save(ctx, captionAggregate, esSession) | |
if err != nil { | |
s.logger.Error(ctx, err.Error()) | |
return nil, status.Errorf(codes.Internal, err.Error()) | |
} | |
// ... (create and save other aggregates) | |
// Finally, commit all changes. | |
err = s.eventStore.Commit(ctx, esSession) | |
if err != nil { | |
s.logger.Error(ctx, err.Error()) | |
return nil, status.Errorf(codes.Internal, err.Error()) | |
} | |
// Return the response with artwork ID and edition descriptors. | |
return &proto_appv1.RegisterEditionedArtworkResponse{ | |
ArtworkId: artworkID, | |
Editions: editionDescriptors, | |
}, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment