Created
March 11, 2021 10:00
-
-
Save tbg/62f828cbf3fb248f6394062675d13d3a to your computer and use it in GitHub Desktop.
git diff release-20.1 -- ./pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go | |
index a6c400c4d0..4009a4373f 100644 | |
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go | |
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go | |
@@ -132,6 +132,7 @@ type txnSpanRefresher struct { | |
refreshFail *metric.Counter | |
refreshFailWithCondensedSpans *metric.Counter | |
refreshMemoryLimitExceeded *metric.Counter | |
+ refreshAutoRetries *metric.Counter | |
} | |
// SendLocked implements the lockedSender interface. | |
@@ -162,16 +163,15 @@ func (sr *txnSpanRefresher) SendLocked( | |
} | |
// Set the batch's CanForwardReadTimestamp flag. | |
- canFwdRTS := sr.canForwardReadTimestampWithoutRefresh(ba.Txn) | |
- ba.CanForwardReadTimestamp = canFwdRTS | |
+ ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn) | |
if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET { | |
et := rArgs.(*roachpb.EndTxnRequest) | |
- // Assign the EndTxn's CanCommitAtHigherTimestamp flag if it isn't | |
- // already set correctly. We don't write blindly because we could be | |
- // dealing with a re-issued batch from splitEndTxnAndRetrySend after a | |
- // refresh and we don't want to mutate previously issued requests or we | |
- // risk a data race (checked by raceTransport). In these cases, we need | |
- // to clone the EndTxn request first before mutating. | |
+ // Assign the EndTxn's DeprecatedCanCommitAtHigherTimestamp flag if it | |
+ // isn't already set correctly. We don't write blindly because we could | |
+ // be dealing with a re-issued batch from splitEndTxnAndRetrySend after | |
+ // a refresh and we don't want to mutate previously issued requests or | |
+ // we risk a data race (checked by raceTransport). In these cases, we | |
+ // need to clone the EndTxn request first before mutating. | |
// | |
// We know this is a re-issued batch if the flag is already set and we | |
// need to unset it. We aren't able to detect the case where the flag is | |
@@ -181,28 +181,25 @@ func (sr *txnSpanRefresher) SendLocked( | |
// | |
// TODO(nvanbenschoten): this is ugly. If we weren't about to delete | |
// this field, we'd want to do something better. Just delete this ASAP. | |
- if et.CanCommitAtHigherTimestamp != canFwdRTS { | |
- isReissue := et.CanCommitAtHigherTimestamp | |
+ if et.DeprecatedCanCommitAtHigherTimestamp != ba.CanForwardReadTimestamp { | |
+ isReissue := et.DeprecatedCanCommitAtHigherTimestamp | |
if isReissue { | |
etCpy := *et | |
ba.Requests[len(ba.Requests)-1].SetInner(&etCpy) | |
et = &etCpy | |
} | |
- et.CanCommitAtHigherTimestamp = canFwdRTS | |
+ et.DeprecatedCanCommitAtHigherTimestamp = ba.CanForwardReadTimestamp | |
} | |
} | |
- maxAttempts := maxTxnRefreshAttempts | |
- if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 { | |
- if knob == -1 { | |
- maxAttempts = 0 | |
- } else { | |
- maxAttempts = knob | |
- } | |
+ // Attempt a refresh before sending the batch. | |
+ ba, pErr := sr.maybeRefreshPreemptively(ctx, ba) | |
+ if pErr != nil { | |
+ return nil, pErr | |
} | |
// Send through wrapped lockedSender. Unlocks while sending then re-locks. | |
- br, pErr := sr.sendLockedWithRefreshAttempts(ctx, ba, maxAttempts) | |
+ br, pErr := sr.sendLockedWithRefreshAttempts(ctx, ba, sr.maxRefreshAttempts()) | |
if pErr != nil { | |
return nil, pErr | |
} | |
@@ -296,12 +293,11 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( | |
// | |
// For the refresh, we have two options: either refresh everything read | |
// *before* this batch, and then retry this batch, or refresh the current | |
- // batch's reads too and then, if successful, there'd be nothing to refresh. | |
+ // batch's reads too and then, if successful, there'd be nothing to retry. | |
// We take the former option by setting br = nil below to minimized the | |
// chances that the refresh fails. | |
bumpedTxn := br.Txn.Clone() | |
bumpedTxn.WriteTooOld = false | |
- bumpedTxn.ReadTimestamp = bumpedTxn.WriteTimestamp | |
pErr = roachpb.NewErrorWithTxn( | |
roachpb.NewTransactionRetryError(roachpb.RETRY_WRITE_TOO_OLD, | |
"WriteTooOld flag converted to WriteTooOldError"), | |
@@ -310,7 +306,7 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( | |
} | |
if pErr != nil { | |
if maxRefreshAttempts > 0 { | |
- br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts) | |
+ br, pErr = sr.maybeRefreshAndRetrySend(ctx, ba, pErr, maxRefreshAttempts) | |
} else { | |
log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted") | |
} | |
@@ -319,41 +315,31 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( | |
return br, pErr | |
} | |
-// maybeRetrySend attempts to catch serializable errors and avoid them by | |
-// refreshing the txn at a larger timestamp. If it succeeds at refreshing the | |
+// maybeRefreshAndRetrySend attempts to catch serializable errors and avoid them | |
+// by refreshing the txn at a larger timestamp. If it succeeds at refreshing the | |
// txn timestamp, it recurses into sendLockedWithRefreshAttempts and retries the | |
// batch. If the refresh fails, the input pErr is returned. | |
-func (sr *txnSpanRefresher) maybeRetrySend( | |
+func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( | |
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int, | |
) (*roachpb.BatchResponse, *roachpb.Error) { | |
// Check for an error which can be retried after updating spans. | |
- canRetryTxn, retryTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr) | |
- if !canRetryTxn || !sr.canAutoRetry { | |
+ canRefreshTxn, refreshTxn := roachpb.CanTransactionRefresh(ctx, pErr) | |
+ if !canRefreshTxn || !sr.canAutoRetry { | |
return nil, pErr | |
} | |
- | |
- // If a prefix of the batch was executed, collect refresh spans for | |
- // that executed portion, and retry the remainder. The canonical | |
- // case is a batch split between everything up to but not including | |
- // the EndTxn. Requests up to the EndTxn succeed, but the EndTxn | |
- // fails with a retryable error. We want to retry only the EndTxn. | |
- ba.UpdateTxn(retryTxn) | |
- log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s", | |
- ba, retryTxn.ReadTimestamp, pErr) | |
+ log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr) | |
// Try updating the txn spans so we can retry. | |
- if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok { | |
- sr.refreshFail.Inc(1) | |
- if sr.refreshFootprint.condensed { | |
- sr.refreshFailWithCondensedSpans.Inc(1) | |
- } | |
+ if ok := sr.tryUpdatingTxnSpans(ctx, refreshTxn); !ok { | |
+ log.Eventf(ctx, "refresh failed; propagating original retry error") | |
return nil, pErr | |
} | |
// We've refreshed all of the read spans successfully and bumped | |
// ba.Txn's timestamps. Attempt the request again. | |
- sr.refreshSuccess.Inc(1) | |
log.Eventf(ctx, "refresh succeeded; retrying original request") | |
+ ba.UpdateTxn(refreshTxn) | |
+ sr.refreshAutoRetries.Inc(1) | |
// To prevent starvation of batches that are trying to commit, split off the | |
// EndTxn request into its own batch on auto-retries. This avoids starvation | |
@@ -371,9 +357,7 @@ func (sr *txnSpanRefresher) maybeRetrySend( | |
return sr.splitEndTxnAndRetrySend(ctx, ba) | |
} | |
- retryBr, retryErr := sr.sendLockedWithRefreshAttempts( | |
- ctx, ba, maxRefreshAttempts-1, | |
- ) | |
+ retryBr, retryErr := sr.sendLockedWithRefreshAttempts(ctx, ba, maxRefreshAttempts-1) | |
if retryErr != nil { | |
log.VEventf(ctx, 2, "retry failed with %s", retryErr) | |
return nil, retryErr | |
@@ -407,7 +391,7 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend( | |
// Issue a batch containing only the EndTxn request. | |
baSuffix := ba | |
baSuffix.Requests = ba.Requests[etIdx:] | |
- baSuffix.Txn = brPrefix.Txn | |
+ baSuffix.UpdateTxn(brPrefix.Txn) | |
brSuffix, pErr := sr.SendLocked(ctx, baSuffix) | |
if pErr != nil { | |
return nil, pErr | |
@@ -422,6 +406,97 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend( | |
return br, nil | |
} | |
+// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp | |
+// eagerly. Doing so can take advantage of opportunities where the refresh is | |
+// free or can avoid wasting work issuing a batch containing an EndTxn that will | |
+// necessarily throw a serializable error. The method returns a batch with an | |
+// updated transaction if the refresh is successful, or a retry error if not. | |
+func (sr *txnSpanRefresher) maybeRefreshPreemptively( | |
+ ctx context.Context, ba roachpb.BatchRequest, | |
+) (roachpb.BatchRequest, *roachpb.Error) { | |
+ // If we know that the transaction will need a refresh at some point because | |
+ // its write timestamp has diverged from its read timestamp, consider doing | |
+ // so preemptively. We perform a preemptive refresh if either a) doing so | |
+ // would be free because we have not yet accumulated any refresh spans, or | |
+ // b) the batch contains a committing EndTxn request that we know will be | |
+ // rejected if issued. | |
+ // | |
+ // The first case is straightforward. If the transaction has yet to perform | |
+ // any reads but has had its write timestamp bumped, refreshing is a trivial | |
+ // no-op. In this case, refreshing eagerly prevents the transaction for | |
+ // performing any future reads at its current read timestamp. Not doing so | |
+ // preemptively guarantees that we will need to perform a real refresh in | |
+ // the future if the transaction ever performs a read. At best, this would | |
+ // be wasted work. At worst, this could result in the future refresh | |
+ // failing. So we might as well refresh preemptively while doing so is free. | |
+ // | |
+ // Note that this first case here does NOT obviate the need for server-side | |
+ // refreshes. Notably, a transaction's write timestamp might be bumped in | |
+ // the same batch in which it performs its first read. In such cases, a | |
+ // preemptive refresh would not be needed but a reactive refresh would not | |
+ // be a trivial no-op. These situations are common for one-phase commit | |
+ // transactions. | |
+ // | |
+ // The second case is more complex. If the batch contains a committing | |
+ // EndTxn request that we know will need a refresh, we don't want to bother | |
+ // issuing it just for it to be rejected. Instead, preemptively refresh | |
+ // before issuing the EndTxn batch. If we view reads as acquiring a form of | |
+ // optimistic read locks under an optimistic concurrency control scheme (as | |
+ // is discussed in the comment on txnSpanRefresher) then this preemptive | |
+ // refresh immediately before the EndTxn is synonymous with the "validation" | |
+ // phase of a standard OCC transaction model. However, as an optimization | |
+ // compared to standard OCC, the validation phase is only performed when | |
+ // necessary in CockroachDB (i.e. if the transaction's writes have been | |
+ // pushed to higher timestamps). | |
+ // | |
+ // TODO(andrei): whether or not we can still auto-retry at the SQL level | |
+ // should also play a role in deciding whether we want to refresh eagerly or | |
+ // not. | |
+ | |
+ // If the transaction has yet to be pushed, no refresh is necessary. | |
+ if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp { | |
+ return ba, nil | |
+ } | |
+ | |
+ // If true, tryUpdatingTxnSpans will trivially succeed. | |
+ refreshFree := ba.CanForwardReadTimestamp | |
+ | |
+ // If true, this batch is guaranteed to fail without a refresh. | |
+ args, hasET := ba.GetArg(roachpb.EndTxn) | |
+ refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit | |
+ | |
+ // If neither condition is true, defer the refresh. | |
+ if !refreshFree && !refreshInevitable { | |
+ return ba, nil | |
+ } | |
+ | |
+ canRefreshTxn, refreshTxn := roachpb.PrepareTransactionForRefresh(ba.Txn, ba.Txn.WriteTimestamp) | |
+ if !canRefreshTxn || !sr.canAutoRetry { | |
+ return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn) | |
+ } | |
+ log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s", | |
+ refreshTxn.ReadTimestamp, ba) | |
+ | |
+ // Try updating the txn spans at a timestamp that will allow us to commit. | |
+ if ok := sr.tryUpdatingTxnSpans(ctx, refreshTxn); !ok { | |
+ log.Eventf(ctx, "preemptive refresh failed; propagating retry error") | |
+ return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn) | |
+ } | |
+ | |
+ log.Eventf(ctx, "preemptive refresh succeeded") | |
+ ba.UpdateTxn(refreshTxn) | |
+ return ba, nil | |
+} | |
+ | |
+func newRetryErrorOnFailedPreemptiveRefresh(txn *roachpb.Transaction) *roachpb.Error { | |
+ reason := roachpb.RETRY_SERIALIZABLE | |
+ if txn.WriteTooOld { | |
+ reason = roachpb.RETRY_WRITE_TOO_OLD | |
+ } | |
+ err := roachpb.NewTransactionRetryError(reason, "failed preemptive refresh") | |
+ return roachpb.NewErrorWithTxn(err, txn) | |
+} | |
+ | |
// tryUpdatingTxnSpans sends Refresh and RefreshRange commands to all spans read | |
// during the transaction to ensure that no writes were written more recently | |
// than sr.refreshedTimestamp. All implicated timestamp caches are updated with | |
@@ -429,7 +504,18 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend( | |
// or not. | |
func (sr *txnSpanRefresher) tryUpdatingTxnSpans( | |
ctx context.Context, refreshTxn *roachpb.Transaction, | |
-) bool { | |
+) (ok bool) { | |
+ // Track the result of the refresh in metrics. | |
+ defer func() { | |
+ if ok { | |
+ sr.refreshSuccess.Inc(1) | |
+ } else { | |
+ sr.refreshFail.Inc(1) | |
+ if sr.refreshFootprint.condensed { | |
+ sr.refreshFailWithCondensedSpans.Inc(1) | |
+ } | |
+ } | |
+ }() | |
if sr.refreshInvalid { | |
log.VEvent(ctx, 2, "can't refresh txn spans; not valid") | |
@@ -478,7 +564,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( | |
// Send through wrapped lockedSender. Unlocks while sending then re-locks. | |
if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil { | |
- log.VEventf(ctx, 2, "failed to refresh txn spans (%s); propagating original retry error", batchErr) | |
+ log.VEventf(ctx, 2, "failed to refresh txn spans (%s)", batchErr) | |
return false | |
} | |
@@ -501,16 +587,26 @@ func (sr *txnSpanRefresher) appendRefreshSpans( | |
} | |
ba.RefreshSpanIterate(br, func(span roachpb.Span) { | |
- log.VEventf(ctx, 3, "recording span to refresh: %s", span) | |
+ if log.ExpensiveLogEnabled(ctx, 3) { | |
+ log.VEventf(ctx, 3, "recording span to refresh: %s", span.String()) | |
+ } | |
sr.refreshFootprint.insert(span) | |
}) | |
return nil | |
} | |
// canForwardReadTimestampWithoutRefresh returns whether the transaction can | |
-// forward its read timestamp without refreshing any read spans. This allows | |
-// for the "server-side refresh" optimization, where batches are re-evaluated | |
-// at a higher read-timestamp without returning to transaction coordinator. | |
+// forward its read timestamp without refreshing any read spans. This allows for | |
+// the "server-side refresh" optimization, where batches are re-evaluated at a | |
+// higher read-timestamp without returning to transaction coordinator. | |
+// | |
+// This requires that the transaction has encountered no spans which require | |
+// refreshing at the forwarded timestamp and that the transaction's timestamp | |
+// has not leaked. If either of those conditions are true, a client-side refresh | |
+// is required. | |
+// | |
+// Note that when deciding whether a transaction can be bumped to a particular | |
+// timestamp, the transaction's deadling must also be taken into account. | |
func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool { | |
return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed | |
} | |
@@ -532,6 +628,18 @@ func (sr *txnSpanRefresher) forwardRefreshTimestampOnResponse( | |
} | |
} | |
+// maxRefreshAttempts returns the configured number of times that a transaction | |
+// should attempt to refresh its spans for a single batch. | |
+func (sr *txnSpanRefresher) maxRefreshAttempts() int { | |
+ if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 { | |
+ if knob == -1 { | |
+ return 0 | |
+ } | |
+ return knob | |
+ } | |
+ return maxTxnRefreshAttempts | |
+} | |
+ | |
// setWrapped implements the txnInterceptor interface. | |
func (sr *txnSpanRefresher) setWrapped(wrapped lockedSender) { sr.wrapped = wrapped } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment