Skip to content

Instantly share code, notes, and snippets.

@tbg
Created March 11, 2021 10:00
Show Gist options
  • Save tbg/62f828cbf3fb248f6394062675d13d3a to your computer and use it in GitHub Desktop.
Save tbg/62f828cbf3fb248f6394062675d13d3a to your computer and use it in GitHub Desktop.
git diff release-20.1 -- ./pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
index a6c400c4d0..4009a4373f 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
@@ -132,6 +132,7 @@ type txnSpanRefresher struct {
refreshFail *metric.Counter
refreshFailWithCondensedSpans *metric.Counter
refreshMemoryLimitExceeded *metric.Counter
+ refreshAutoRetries *metric.Counter
}
// SendLocked implements the lockedSender interface.
@@ -162,16 +163,15 @@ func (sr *txnSpanRefresher) SendLocked(
}
// Set the batch's CanForwardReadTimestamp flag.
- canFwdRTS := sr.canForwardReadTimestampWithoutRefresh(ba.Txn)
- ba.CanForwardReadTimestamp = canFwdRTS
+ ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn)
if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET {
et := rArgs.(*roachpb.EndTxnRequest)
- // Assign the EndTxn's CanCommitAtHigherTimestamp flag if it isn't
- // already set correctly. We don't write blindly because we could be
- // dealing with a re-issued batch from splitEndTxnAndRetrySend after a
- // refresh and we don't want to mutate previously issued requests or we
- // risk a data race (checked by raceTransport). In these cases, we need
- // to clone the EndTxn request first before mutating.
+ // Assign the EndTxn's DeprecatedCanCommitAtHigherTimestamp flag if it
+ // isn't already set correctly. We don't write blindly because we could
+ // be dealing with a re-issued batch from splitEndTxnAndRetrySend after
+ // a refresh and we don't want to mutate previously issued requests or
+ // we risk a data race (checked by raceTransport). In these cases, we
+ // need to clone the EndTxn request first before mutating.
//
// We know this is a re-issued batch if the flag is already set and we
// need to unset it. We aren't able to detect the case where the flag is
@@ -181,28 +181,25 @@ func (sr *txnSpanRefresher) SendLocked(
//
// TODO(nvanbenschoten): this is ugly. If we weren't about to delete
// this field, we'd want to do something better. Just delete this ASAP.
- if et.CanCommitAtHigherTimestamp != canFwdRTS {
- isReissue := et.CanCommitAtHigherTimestamp
+ if et.DeprecatedCanCommitAtHigherTimestamp != ba.CanForwardReadTimestamp {
+ isReissue := et.DeprecatedCanCommitAtHigherTimestamp
if isReissue {
etCpy := *et
ba.Requests[len(ba.Requests)-1].SetInner(&etCpy)
et = &etCpy
}
- et.CanCommitAtHigherTimestamp = canFwdRTS
+ et.DeprecatedCanCommitAtHigherTimestamp = ba.CanForwardReadTimestamp
}
}
- maxAttempts := maxTxnRefreshAttempts
- if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 {
- if knob == -1 {
- maxAttempts = 0
- } else {
- maxAttempts = knob
- }
+ // Attempt a refresh before sending the batch.
+ ba, pErr := sr.maybeRefreshPreemptively(ctx, ba)
+ if pErr != nil {
+ return nil, pErr
}
// Send through wrapped lockedSender. Unlocks while sending then re-locks.
- br, pErr := sr.sendLockedWithRefreshAttempts(ctx, ba, maxAttempts)
+ br, pErr := sr.sendLockedWithRefreshAttempts(ctx, ba, sr.maxRefreshAttempts())
if pErr != nil {
return nil, pErr
}
@@ -296,12 +293,11 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
//
// For the refresh, we have two options: either refresh everything read
// *before* this batch, and then retry this batch, or refresh the current
- // batch's reads too and then, if successful, there'd be nothing to refresh.
+ // batch's reads too and then, if successful, there'd be nothing to retry.
// We take the former option by setting br = nil below to minimized the
// chances that the refresh fails.
bumpedTxn := br.Txn.Clone()
bumpedTxn.WriteTooOld = false
- bumpedTxn.ReadTimestamp = bumpedTxn.WriteTimestamp
pErr = roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_WRITE_TOO_OLD,
"WriteTooOld flag converted to WriteTooOldError"),
@@ -310,7 +306,7 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
}
if pErr != nil {
if maxRefreshAttempts > 0 {
- br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts)
+ br, pErr = sr.maybeRefreshAndRetrySend(ctx, ba, pErr, maxRefreshAttempts)
} else {
log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted")
}
@@ -319,41 +315,31 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
return br, pErr
}
-// maybeRetrySend attempts to catch serializable errors and avoid them by
-// refreshing the txn at a larger timestamp. If it succeeds at refreshing the
+// maybeRefreshAndRetrySend attempts to catch serializable errors and avoid them
+// by refreshing the txn at a larger timestamp. If it succeeds at refreshing the
// txn timestamp, it recurses into sendLockedWithRefreshAttempts and retries the
// batch. If the refresh fails, the input pErr is returned.
-func (sr *txnSpanRefresher) maybeRetrySend(
+func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Check for an error which can be retried after updating spans.
- canRetryTxn, retryTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr)
- if !canRetryTxn || !sr.canAutoRetry {
+ canRefreshTxn, refreshTxn := roachpb.CanTransactionRefresh(ctx, pErr)
+ if !canRefreshTxn || !sr.canAutoRetry {
return nil, pErr
}
-
- // If a prefix of the batch was executed, collect refresh spans for
- // that executed portion, and retry the remainder. The canonical
- // case is a batch split between everything up to but not including
- // the EndTxn. Requests up to the EndTxn succeed, but the EndTxn
- // fails with a retryable error. We want to retry only the EndTxn.
- ba.UpdateTxn(retryTxn)
- log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s",
- ba, retryTxn.ReadTimestamp, pErr)
+ log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr)
// Try updating the txn spans so we can retry.
- if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok {
- sr.refreshFail.Inc(1)
- if sr.refreshFootprint.condensed {
- sr.refreshFailWithCondensedSpans.Inc(1)
- }
+ if ok := sr.tryUpdatingTxnSpans(ctx, refreshTxn); !ok {
+ log.Eventf(ctx, "refresh failed; propagating original retry error")
return nil, pErr
}
// We've refreshed all of the read spans successfully and bumped
// ba.Txn's timestamps. Attempt the request again.
- sr.refreshSuccess.Inc(1)
log.Eventf(ctx, "refresh succeeded; retrying original request")
+ ba.UpdateTxn(refreshTxn)
+ sr.refreshAutoRetries.Inc(1)
// To prevent starvation of batches that are trying to commit, split off the
// EndTxn request into its own batch on auto-retries. This avoids starvation
@@ -371,9 +357,7 @@ func (sr *txnSpanRefresher) maybeRetrySend(
return sr.splitEndTxnAndRetrySend(ctx, ba)
}
- retryBr, retryErr := sr.sendLockedWithRefreshAttempts(
- ctx, ba, maxRefreshAttempts-1,
- )
+ retryBr, retryErr := sr.sendLockedWithRefreshAttempts(ctx, ba, maxRefreshAttempts-1)
if retryErr != nil {
log.VEventf(ctx, 2, "retry failed with %s", retryErr)
return nil, retryErr
@@ -407,7 +391,7 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
// Issue a batch containing only the EndTxn request.
baSuffix := ba
baSuffix.Requests = ba.Requests[etIdx:]
- baSuffix.Txn = brPrefix.Txn
+ baSuffix.UpdateTxn(brPrefix.Txn)
brSuffix, pErr := sr.SendLocked(ctx, baSuffix)
if pErr != nil {
return nil, pErr
@@ -422,6 +406,97 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
return br, nil
}
+// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp
+// eagerly. Doing so can take advantage of opportunities where the refresh is
+// free or can avoid wasting work issuing a batch containing an EndTxn that will
+// necessarily throw a serializable error. The method returns a batch with an
+// updated transaction if the refresh is successful, or a retry error if not.
+func (sr *txnSpanRefresher) maybeRefreshPreemptively(
+ ctx context.Context, ba roachpb.BatchRequest,
+) (roachpb.BatchRequest, *roachpb.Error) {
+ // If we know that the transaction will need a refresh at some point because
+ // its write timestamp has diverged from its read timestamp, consider doing
+ // so preemptively. We perform a preemptive refresh if either a) doing so
+ // would be free because we have not yet accumulated any refresh spans, or
+ // b) the batch contains a committing EndTxn request that we know will be
+ // rejected if issued.
+ //
+ // The first case is straightforward. If the transaction has yet to perform
+ // any reads but has had its write timestamp bumped, refreshing is a trivial
+ // no-op. In this case, refreshing eagerly prevents the transaction for
+ // performing any future reads at its current read timestamp. Not doing so
+ // preemptively guarantees that we will need to perform a real refresh in
+ // the future if the transaction ever performs a read. At best, this would
+ // be wasted work. At worst, this could result in the future refresh
+ // failing. So we might as well refresh preemptively while doing so is free.
+ //
+ // Note that this first case here does NOT obviate the need for server-side
+ // refreshes. Notably, a transaction's write timestamp might be bumped in
+ // the same batch in which it performs its first read. In such cases, a
+ // preemptive refresh would not be needed but a reactive refresh would not
+ // be a trivial no-op. These situations are common for one-phase commit
+ // transactions.
+ //
+ // The second case is more complex. If the batch contains a committing
+ // EndTxn request that we know will need a refresh, we don't want to bother
+ // issuing it just for it to be rejected. Instead, preemptively refresh
+ // before issuing the EndTxn batch. If we view reads as acquiring a form of
+ // optimistic read locks under an optimistic concurrency control scheme (as
+ // is discussed in the comment on txnSpanRefresher) then this preemptive
+ // refresh immediately before the EndTxn is synonymous with the "validation"
+ // phase of a standard OCC transaction model. However, as an optimization
+ // compared to standard OCC, the validation phase is only performed when
+ // necessary in CockroachDB (i.e. if the transaction's writes have been
+ // pushed to higher timestamps).
+ //
+ // TODO(andrei): whether or not we can still auto-retry at the SQL level
+ // should also play a role in deciding whether we want to refresh eagerly or
+ // not.
+
+ // If the transaction has yet to be pushed, no refresh is necessary.
+ if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
+ return ba, nil
+ }
+
+ // If true, tryUpdatingTxnSpans will trivially succeed.
+ refreshFree := ba.CanForwardReadTimestamp
+
+ // If true, this batch is guaranteed to fail without a refresh.
+ args, hasET := ba.GetArg(roachpb.EndTxn)
+ refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit
+
+ // If neither condition is true, defer the refresh.
+ if !refreshFree && !refreshInevitable {
+ return ba, nil
+ }
+
+ canRefreshTxn, refreshTxn := roachpb.PrepareTransactionForRefresh(ba.Txn, ba.Txn.WriteTimestamp)
+ if !canRefreshTxn || !sr.canAutoRetry {
+ return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn)
+ }
+ log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s",
+ refreshTxn.ReadTimestamp, ba)
+
+ // Try updating the txn spans at a timestamp that will allow us to commit.
+ if ok := sr.tryUpdatingTxnSpans(ctx, refreshTxn); !ok {
+ log.Eventf(ctx, "preemptive refresh failed; propagating retry error")
+ return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn)
+ }
+
+ log.Eventf(ctx, "preemptive refresh succeeded")
+ ba.UpdateTxn(refreshTxn)
+ return ba, nil
+}
+
+func newRetryErrorOnFailedPreemptiveRefresh(txn *roachpb.Transaction) *roachpb.Error {
+ reason := roachpb.RETRY_SERIALIZABLE
+ if txn.WriteTooOld {
+ reason = roachpb.RETRY_WRITE_TOO_OLD
+ }
+ err := roachpb.NewTransactionRetryError(reason, "failed preemptive refresh")
+ return roachpb.NewErrorWithTxn(err, txn)
+}
+
// tryUpdatingTxnSpans sends Refresh and RefreshRange commands to all spans read
// during the transaction to ensure that no writes were written more recently
// than sr.refreshedTimestamp. All implicated timestamp caches are updated with
@@ -429,7 +504,18 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
// or not.
func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
ctx context.Context, refreshTxn *roachpb.Transaction,
-) bool {
+) (ok bool) {
+ // Track the result of the refresh in metrics.
+ defer func() {
+ if ok {
+ sr.refreshSuccess.Inc(1)
+ } else {
+ sr.refreshFail.Inc(1)
+ if sr.refreshFootprint.condensed {
+ sr.refreshFailWithCondensedSpans.Inc(1)
+ }
+ }
+ }()
if sr.refreshInvalid {
log.VEvent(ctx, 2, "can't refresh txn spans; not valid")
@@ -478,7 +564,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
// Send through wrapped lockedSender. Unlocks while sending then re-locks.
if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil {
- log.VEventf(ctx, 2, "failed to refresh txn spans (%s); propagating original retry error", batchErr)
+ log.VEventf(ctx, 2, "failed to refresh txn spans (%s)", batchErr)
return false
}
@@ -501,16 +587,26 @@ func (sr *txnSpanRefresher) appendRefreshSpans(
}
ba.RefreshSpanIterate(br, func(span roachpb.Span) {
- log.VEventf(ctx, 3, "recording span to refresh: %s", span)
+ if log.ExpensiveLogEnabled(ctx, 3) {
+ log.VEventf(ctx, 3, "recording span to refresh: %s", span.String())
+ }
sr.refreshFootprint.insert(span)
})
return nil
}
// canForwardReadTimestampWithoutRefresh returns whether the transaction can
-// forward its read timestamp without refreshing any read spans. This allows
-// for the "server-side refresh" optimization, where batches are re-evaluated
-// at a higher read-timestamp without returning to transaction coordinator.
+// forward its read timestamp without refreshing any read spans. This allows for
+// the "server-side refresh" optimization, where batches are re-evaluated at a
+// higher read-timestamp without returning to transaction coordinator.
+//
+// This requires that the transaction has encountered no spans which require
+// refreshing at the forwarded timestamp and that the transaction's timestamp
+// has not leaked. If either of those conditions are true, a client-side refresh
+// is required.
+//
+// Note that when deciding whether a transaction can be bumped to a particular
+// timestamp, the transaction's deadling must also be taken into account.
func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool {
return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed
}
@@ -532,6 +628,18 @@ func (sr *txnSpanRefresher) forwardRefreshTimestampOnResponse(
}
}
+// maxRefreshAttempts returns the configured number of times that a transaction
+// should attempt to refresh its spans for a single batch.
+func (sr *txnSpanRefresher) maxRefreshAttempts() int {
+ if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 {
+ if knob == -1 {
+ return 0
+ }
+ return knob
+ }
+ return maxTxnRefreshAttempts
+}
+
// setWrapped implements the txnInterceptor interface.
func (sr *txnSpanRefresher) setWrapped(wrapped lockedSender) { sr.wrapped = wrapped }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment