|
package eventadapter |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"time" |
|
|
|
"github.com/incident-io/core/server/lib/cache" |
|
"github.com/incident-io/core/server/lib/envcheck" |
|
"github.com/incident-io/core/server/lib/errors" |
|
"github.com/incident-io/core/server/lib/log" |
|
"github.com/incident-io/core/server/lib/md" |
|
"github.com/incident-io/core/server/lib/metrics" |
|
"github.com/incident-io/core/server/lib/safe" |
|
"github.com/incident-io/core/server/lib/traffic" |
|
"github.com/incident-io/core/server/pkg/identity" |
|
"github.com/prometheus/client_golang/prometheus" |
|
"github.com/prometheus/client_golang/prometheus/promauto" |
|
"github.com/samber/lo" |
|
"go.opentelemetry.io/otel/codes" |
|
"go.opentelemetry.io/otel/trace" |
|
) |
|
|
|
// Dependencies are runtime constructed dependencies provided for use when processing |
|
// events. |
|
type Dependencies struct { |
|
Cache cache.Service // for storing error counts |
|
Control *traffic.Control // for applying gate controls |
|
Identity *identity.Service // for resolving organisation details |
|
} |
|
|
|
// EventMetadata contains details about the specific event that we're processing. |
|
type EventMetadata struct { |
|
Adapter string // e.g. pubsub, in_memory, cloud_scheduler, etc. |
|
Topic string // e.g. escalation.changed |
|
Subscriber string // e.g. sync-message |
|
SourcePackage string // e.g. app/escalator/executor |
|
MessageID string // e.g. 1234567890 |
|
ErrorCount int // e.g. 0 |
|
PublishTime time.Time // e.g. 2018-01-01T00:00:00Z |
|
ParentTraceID string // e.g. 90ce95ed11b25942403dc821bb5d73a1 |
|
ParentSpanID string // e.g. 1234567890 |
|
} |
|
|
|
// ProcessOutcome is used to signal the result of processing an event. |
|
type ProcessOutcome string |
|
|
|
var ( |
|
// ProcessOutcomeSuccess is used when an event is processed successfully. |
|
ProcessOutcomeSuccess ProcessOutcome = "success" |
|
|
|
// ProcessOutcomeError is used when an event is processed unsuccessfully, and may be |
|
// retried. |
|
ProcessOutcomeError ProcessOutcome = "error" |
|
|
|
// ProcessOutcomeExpired is used when we refused to process an event because the time |
|
// between publishing the event and us trying to process it has exceeded the |
|
// subscription stale threshold. |
|
// |
|
// These events are dropped entirely and are not retried. |
|
ProcessOutcomeExpired ProcessOutcome = "expired" |
|
|
|
// ProcessOutcomeRateLimited is used when we refused to process an event because it |
|
// exceeded the rate limit. We don't normally apply limits so this usually only happens |
|
// in an incident when we've applied emergency limit overrides. |
|
ProcessOutcomeRateLimited ProcessOutcome = "rate_limited" |
|
) |
|
|
|
var ( |
|
eventRunningSecondsTotal = promauto.NewCounterVec( |
|
prometheus.CounterOpts{ |
|
Name: "core_event_running_seconds_total", |
|
Help: "A live updated counter of the number of seconds events have been running for.", |
|
}, |
|
[]string{ |
|
"topic", // e.g. "escalation.changed" |
|
"subscriber", // e.g. "sync-message" |
|
"package", // e.g. "app/escalator/executor" |
|
"source_package", // e.g. "app/escalator/executor" |
|
}, |
|
) |
|
eventProcessedTotal = promauto.NewCounterVec( |
|
prometheus.CounterOpts{ |
|
Name: "core_event_processed_total", |
|
Help: "Updated whenever we finish processing an event.", |
|
}, |
|
[]string{ |
|
"topic", // e.g. "escalation.changed" |
|
"subscriber", // e.g. "sync-message" |
|
"package", // e.g. "app/escalator/executor" |
|
"source_package", // e.g. "app/escalator/executor" |
|
"outcome", // e.g. "success", "error", "expired" |
|
}, |
|
) |
|
) |
|
|
|
var ( |
|
// Apply an observation-only gate to event processing, as opposed to a limit gate which |
|
// would apply rate-limits. |
|
// |
|
// As the gate is 'unlimited' we won't apply limits, but we could apply a limit override |
|
// while in production via the traffic CLI if we find an event is problematic, making |
|
// this more akin to a kill-switch. |
|
gateEventSubscription = &traffic.Gate{ |
|
Name: "event_subscription", |
|
SampleLimit: 5, |
|
SampleFloor: -1, |
|
GetLimit: func(labels []string) (limit int64, refillRate float64) { |
|
return -1, 0.0 // this is a gate that observes only and does not limit |
|
}, |
|
Labels: []string{ |
|
"topic", |
|
"subscriber", |
|
}, |
|
} |
|
|
|
// Unlimited gate per organisation, but as above we could apply a limit |
|
// override while in production via the traffic CLI if we find an event is |
|
// problematic, making this more akin to a kill-switch. |
|
gateEventSubscriptionOrganisation = &traffic.Gate{ |
|
Name: "event_subscription_organisation", |
|
SampleLimit: 5, |
|
SampleFloor: -1, |
|
GetLimit: func(labels []string) (limit int64, refillRate float64) { |
|
return -1, 0.0 |
|
}, |
|
Labels: []string{ |
|
"topic", |
|
"subscriber", |
|
"organisation_id", |
|
}, |
|
} |
|
) |
|
|
|
// process is the single codepath for processing events. |
|
func process( |
|
ctx context.Context, |
|
deps Dependencies, // service injection |
|
subscribeParams SubscribeParams, // subscription behaviour |
|
ev Eventer, // the event we're processing |
|
eventMetadata EventMetadata, // specific event metadata |
|
handler func(ctx context.Context) error, // the actual event handler |
|
) ( |
|
outcome ProcessOutcome, err error, |
|
) { |
|
// Apply dependencies to the context. |
|
ctx = cache.WithCache(ctx, deps.Cache) |
|
|
|
// Build metadata as we proceed through processing the message. |
|
// |
|
// These will be logged against the event_processed event and is a key part of our |
|
// observability stack. We _must_ log it correctly so we can calculate metrics on event |
|
// activity. |
|
// |
|
// Be aware that changing any of these fields may break dashboards and alerts. |
|
o11y := md.Metadata{ |
|
"topic": eventMetadata.Topic, // e.g. "escalation.changed" |
|
"subscriber": eventMetadata.Subscriber, // e.g. "sync-message" |
|
"package": subscribeParams.sourcePackage, // e.g. "app/escalator/executor" |
|
"source_package": eventMetadata.SourcePackage, // e.g. "app/escalator/executor" |
|
"adapter": eventMetadata.Adapter, // e.g. "pubsub" |
|
"message_id": eventMetadata.MessageID, // e.g. "1234567890" |
|
"publish_time": eventMetadata.PublishTime, // e.g. 2018-01-01T00:00:00Z |
|
"source": fmt.Sprintf("%s.%s", eventMetadata.Topic, eventMetadata.Subscriber), // e.g. "escalation.changed.sync-message" |
|
} |
|
|
|
// Load any contextual metadata from the event. |
|
{ |
|
// Organisation if it's there. |
|
if organisationID := ev.GetOrganisationID(); organisationID != "" { |
|
o11y["organisation_id"] = organisationID |
|
} |
|
|
|
// Any other metadata. |
|
if ev, ok := ev.(EventerWithMetadata); ok { |
|
// Reverse merge the custom metadata to ensure you can't possibly overwrite key |
|
// fields. |
|
data := ev.GetMetadata() |
|
data.Merge(o11y) |
|
|
|
// Switch it! |
|
o11y = data |
|
} |
|
} |
|
|
|
// If we have a parent trace we should create a link back to whatever published this |
|
// message. |
|
var parent trace.SpanContext |
|
if eventMetadata.ParentTraceID != "" && eventMetadata.ParentSpanID != "" { |
|
var ( |
|
parentTraceID, _ = trace.TraceIDFromHex(eventMetadata.ParentTraceID) |
|
parentSpanID, _ = trace.SpanIDFromHex(eventMetadata.ParentSpanID) |
|
) |
|
|
|
parent = trace.NewSpanContext(trace.SpanContextConfig{ |
|
TraceID: parentTraceID, |
|
SpanID: parentSpanID, |
|
Remote: true, |
|
}) |
|
} |
|
|
|
// From here, we begin tracing. |
|
ctx, span := md.StartNewRootSpanWithParentLink( |
|
ctx, SubscriptionID(eventMetadata.Topic, eventMetadata.Subscriber), parent, trace.SpanKindConsumer, o11y, |
|
) |
|
defer span.End() |
|
|
|
// If we have an organisation ID, we should try resolving the name from the identity |
|
// service. This is totally optional, but it's nice to have. |
|
if organisationID, ok := o11y["organisation_id"].(string); ok { |
|
if deps.Identity != nil { |
|
org, err := deps.Identity.GetOrganisation(ctx, organisationID) |
|
if err != nil { |
|
log.Warn(ctx, errors.Wrap(ctx, err, "failed to get organisation from identity service")) |
|
} else { |
|
// Merge the org metadata into the context, adding the organisation_name. |
|
md.Merge(ctx, org.Metadata()) |
|
} |
|
} |
|
} |
|
|
|
var ( |
|
startTime = time.Now() |
|
publishTime = lo. |
|
If(eventMetadata.PublishTime.IsZero(), time.Now()). |
|
Else(eventMetadata.PublishTime) |
|
queueLatency = startTime.Sub(publishTime) |
|
) |
|
|
|
// It's really important the app reports accurate metrics about what it is working on |
|
// right now. Most telemetry increments counters only after the work is complete, but |
|
// this leads to time-of-measurement bias where you only find out after the work is done |
|
// how long you've been working on it. |
|
// |
|
// https://blog.lawrencejones.dev/incremental-measurement/ |
|
// |
|
// We avoid this by starting a goroutine that periodically increments a counter as we're |
|
// working the event. |
|
done := make(chan struct{}) |
|
defer close(done) |
|
safe.Go(func() { |
|
c := eventRunningSecondsTotal.With(prometheus.Labels{ |
|
"topic": eventMetadata.Topic, |
|
"subscriber": eventMetadata.Subscriber, |
|
"package": subscribeParams.sourcePackage, |
|
"source_package": eventMetadata.SourcePackage, |
|
}) |
|
|
|
lastIncremented := time.Now() |
|
for { |
|
select { |
|
case <-done: |
|
c.Add(time.Since(lastIncremented).Seconds()) // take the final measurment |
|
return |
|
|
|
// Periodically increment the counter. |
|
case <-time.After(time.Second): |
|
c.Add(time.Since(lastIncremented).Seconds()) |
|
lastIncremented = time.Now() |
|
} |
|
} |
|
}) |
|
|
|
// We care less about when we update the processed counter as we wouldn't expect to see |
|
// this change until an event was done (in contrast to a measure of how long we're |
|
// working events, which we do expect to update periodically). |
|
defer func() { |
|
// Increment the event_processed_total counter. |
|
m, err := eventProcessedTotal. |
|
GetMetricWith(prometheus.Labels{ |
|
"topic": eventMetadata.Topic, |
|
"subscriber": eventMetadata.Subscriber, |
|
"package": subscribeParams.sourcePackage, |
|
"source_package": eventMetadata.SourcePackage, |
|
"outcome": string(outcome), |
|
}) |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
m.(prometheus.ExemplarAdder).AddWithExemplar(1, prometheus.Labels{ |
|
"traceID": span.SpanContext().TraceID().String(), |
|
}) |
|
}() |
|
|
|
// Add counters for database usage. |
|
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseDurationCounterKey) |
|
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseTransactionDurationCounterKey) |
|
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseConnectionDurationCounterKey) |
|
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseQueriesCounterKey) |
|
|
|
// Initialise a new context metadata for the duration of this message processing. |
|
ctx = md.New(ctx) |
|
|
|
// Each subscription has a default urgency set against it. We must now set this default |
|
// against the context so that when we later report errors, the urgency is applied. |
|
ctx = errors.NewDefaultUrgency(ctx, subscribeParams.ErrorUrgency) |
|
|
|
// If we're handling an event for a demo org, don't page |
|
if isDemo { |
|
errors.SetDefaultUrgency(ctx, errors.UrgencySentry) |
|
} |
|
|
|
// If we've exceeded the stale message threshold, we should drop the message without |
|
// working it. |
|
if time.Since(publishTime) > subscribeParams.StaleThreshold { |
|
log.Info(ctx, "Dropping event because it's above stale message threshold") |
|
outcome = ProcessOutcomeExpired |
|
|
|
goto reportOutcome |
|
} |
|
|
|
// If we have a traffic controller we should apply the gate. |
|
if deps.Control != nil { |
|
_, ok := deps.Control.Take(ctx, gateEventSubscriptionOrganisation, traffic.Labelset{ |
|
eventMetadata.Topic, |
|
eventMetadata.Subscriber, |
|
ev.GetOrganisationID(), |
|
}, 1) |
|
if !ok { |
|
outcome = ProcessOutcomeRateLimited |
|
goto reportOutcome |
|
} |
|
|
|
_, ok = deps.Control.Take(ctx, gateEventSubscription, traffic.Labelset{ |
|
eventMetadata.Topic, |
|
eventMetadata.Subscriber, |
|
}, 1) |
|
if !ok { |
|
outcome = ProcessOutcomeRateLimited |
|
goto reportOutcome |
|
} |
|
|
|
} |
|
|
|
// Actually handle the message. |
|
err = func() (err error) { |
|
// Ensure whatever happens in the event handler, we recover from panics. |
|
defer func() { |
|
errors.RecoverPanic(recover(), &err) |
|
}() |
|
|
|
return handler(ctx) |
|
}() |
|
|
|
// Update the outcome so we log correctly. |
|
if err != nil { |
|
outcome = ProcessOutcomeError |
|
} else { |
|
outcome = ProcessOutcomeSuccess |
|
} |
|
|
|
// Provide a named label so we can skip past running the subscriber if we've triggered a |
|
// rate-limit or tried processing an expired message. |
|
reportOutcome: |
|
|
|
// Again, these fields are used to build metrics and dashboards on our events. Please |
|
// don't modify without understanding how that may impact our observability. |
|
o11yOutcome := md.Metadata{ |
|
"outcome": outcome, |
|
"duration": time.Since(startTime).Seconds(), |
|
"total_latency": time.Since(publishTime).Seconds(), |
|
"queue_latency": queueLatency.Seconds(), |
|
} |
|
if err != nil { |
|
o11yOutcome["error"] = err.Error() |
|
} |
|
if databaseDuration, ok := metrics.GetDurationLogCounter(ctx, metrics.DatabaseDurationCounterKey); ok { |
|
o11yOutcome["database_duration"] = databaseDuration.Seconds() |
|
} else { |
|
log.Warn(ctx, errors.New(nil, "could not find database duration counter")) |
|
} |
|
if transactionDuration, ok := metrics.GetDurationLogCounter(ctx, metrics.DatabaseTransactionDurationCounterKey); ok { |
|
o11yOutcome["database_transaction_duration"] = transactionDuration.Seconds() |
|
} else { |
|
log.Warn(ctx, errors.New(nil, "could not find database transaction duration counter")) |
|
} |
|
if connectionDuration, ok := metrics.GetDurationLogCounter(ctx, metrics.DatabaseConnectionDurationCounterKey); ok { |
|
o11yOutcome["database_connection_duration"] = connectionDuration.Seconds() |
|
} else { |
|
log.Warn(ctx, errors.New(nil, "could not find database connection duration counter")) |
|
} |
|
if databaseQueries, ok := metrics.GetLogCounter(ctx, metrics.DatabaseQueriesCounterKey); ok { |
|
o11yOutcome["database_queries"] = databaseQueries |
|
} else { |
|
log.Warn(ctx, errors.New(nil, "could not find database connection duration counter")) |
|
} |
|
|
|
// Depending on how running this went, we'll want to either: |
|
switch outcome { |
|
// If we expired, we want to return now and not to continue into error handling. |
|
case ProcessOutcomeExpired: |
|
log.Info(ctx, "Dropping event because it's above stale message threshold", o11y, o11yOutcome, map[string]any{ |
|
"event": "event_processed", // outcome=expired |
|
}) |
|
|
|
return |
|
|
|
// If rate-limited we also want to skip error handling. |
|
case ProcessOutcomeRateLimited: |
|
log.Info(ctx, "Dropping event because it's been rate limited", o11y, o11yOutcome, map[string]any{ |
|
"event": "event_processed", // outcome=rate_limited |
|
}) |
|
|
|
return |
|
|
|
// Otherwise emit the event_processed log and proceed to error handling. |
|
case ProcessOutcomeSuccess, ProcessOutcomeError: |
|
log.Info(ctx, "Handled event", o11y, o11yOutcome, map[string]any{ |
|
"event": "event_processed", // outcome will state how this went |
|
}) |
|
} |
|
|
|
// Error handling: |
|
if err != nil { |
|
if ShouldSilentRetry(err, publishTime) { |
|
// Sometimes, our consumer is unable to process an event for legit reasons and |
|
// should be retried. |
|
// |
|
// Examples are: |
|
// - Upserting an incident participant when a timeline item is created. If multiple |
|
// timeline items are created at once, processes may be racing against each other. |
|
// - Consuming a GitHub pull request webhook. Sometimes GitHub sends these webhooks |
|
// before the PR is available to fetch from the API :shrug: |
|
// |
|
// To avoid spamming Sentry, we completely ignore these errors for the first minute |
|
// after the event is published. After that, this is treated like a normal error |
|
// i.e. log two failures then send the error to Sentry. |
|
log.Info(ctx, "Consumer failed for expected reason, retrying...") |
|
} else { |
|
// Mark our (top level) span as failed, ensuring that we don't drop it in our |
|
// sampler. |
|
// Do this regardless of whether we've hit our max error count or not. |
|
span.SetStatus(codes.Error, err.Error()) |
|
|
|
errorCount, getErrorCountErr := getErrorCount(ctx, eventMetadata.MessageID) |
|
if getErrorCountErr != nil { |
|
log.Warn(ctx, errors.Wrap(ctx, err, "getting error count from cache, assume we've seen an error before")) |
|
|
|
// Adjust to be at the Sentry limit so we don't respond to an unavailable cache by |
|
// not sending to Sentry. |
|
errorCount = 2 // we're about to increment it |
|
} |
|
|
|
// Increment the error count |
|
errorCount++ |
|
|
|
log.Info(ctx, "Incremented error count", map[string]any{ |
|
"error_count": errorCount, |
|
}) |
|
if err := setErrorCount(ctx, eventMetadata.MessageID, errorCount); err != nil { |
|
log.Warn(ctx, errors.Wrap(ctx, err, "failed to set error count")) |
|
} |
|
|
|
// We only want to error on the third failed attempt, as this implies that retrying didn't |
|
// help to fix the issue. |
|
// |
|
// Note that we used to send non-pageable errors (i.e. urgency=Sentry) to Sentry for |
|
// the first two failures, but this actually stopped us from getting paged when |
|
// consumers fail on the third attempt because the pageable error got grouped with |
|
// the first two (non-pageable) errors, so the issue was no longer "new". |
|
// |
|
// Now, we just log.Info. |
|
if errorCount < 3 { |
|
if envcheck.IsDevelopment() { |
|
log.Error(ctx, err, o11yOutcome) |
|
} else { |
|
log.Info(ctx, "Encountered error but <3 attempt, retrying...", o11y, map[string]any{ |
|
"error_count": errorCount, |
|
"cause": err.Error(), |
|
}) |
|
} |
|
} else { |
|
if _, ok := errors.As[SilentlyRetryableError](err); ok { |
|
log.Error(ctx, errors.Wrap(ctx, err, "failed after retrying silently"), o11yOutcome) |
|
} else { |
|
log.Error(ctx, err, o11yOutcome) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Return the error we received from the callback. |
|
return outcome, err |
|
} |