Skip to content

Instantly share code, notes, and snippets.

@tbg
Created January 17, 2019 16:17
Show Gist options
  • Save tbg/41a6e89b95a2d36a4b1b9b28a964ce91 to your computer and use it in GitHub Desktop.
Save tbg/41a6e89b95a2d36a4b1b9b28a964ce91 to your computer and use it in GitHub Desktop.
diff --git a/pkg/storage/intent_resolver.go b/pkg/storage/intent_resolver.go
index cde28265ca..bfe3d5b946 100644
--- a/pkg/storage/intent_resolver.go
+++ b/pkg/storage/intent_resolver.go
@@ -18,6 +18,7 @@ package storage
import (
"container/list"
"context"
+ "math/rand"
"sort"
"time"
@@ -681,6 +682,7 @@ func (ir *intentResolver) cleanupIntentsAsync(
now := r.store.Clock().Now()
for _, item := range intents {
if err := ir.runAsyncTask(ctx, r, allowSyncProcessing, func(ctx context.Context) {
+ time.Sleep(time.Duration(rand.Intn(int(10 * time.Millisecond))))
if _, err := ir.cleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH); err != nil {
if ir.every.ShouldLog() {
log.Warning(ctx, err)
@@ -742,6 +744,7 @@ func (ir *intentResolver) cleanupIntents(
func (ir *intentResolver) cleanupTxnIntentsAsync(
ctx context.Context, r *Replica, endTxns []result.EndTxnIntents, allowSyncProcessing bool,
) error {
+ return nil
now := r.store.Clock().Now()
for _, et := range endTxns {
if err := ir.runAsyncTask(ctx, r, allowSyncProcessing, func(ctx context.Context) {
diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go
index 64d95fd230..1dd399447b 100644
--- a/pkg/storage/merge_queue.go
+++ b/pkg/storage/merge_queue.go
@@ -44,7 +44,7 @@ const (
// The current implementation of merges requires rewriting the right-hand data
// onto the left-hand range, even when the ranges are collocated. This is
// expensive, so limit to one merge at a time.
- mergeQueueConcurrency = 1
+ mergeQueueConcurrency = 8
)
// MergeQueueInterval is a setting that controls how often the merge queue waits
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 59cdd22f79..299a17cf00 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -2432,6 +2432,19 @@ func (s *Store) MergeRange(
return err
}
+ if crash := rand.Intn(3) == 0; crash {
+ go func() {
+ time.Sleep(time.Duration(rand.Intn(int(5 * time.Millisecond.Nanoseconds()))))
+ p, err := os.FindProcess(os.Getpid())
+ if err != nil {
+ log.Fatal(ctx, err)
+ }
+ if err := p.Kill(); err != nil {
+ log.Fatal(ctx, err)
+ }
+ }()
+ }
+
leftRepl.raftMu.AssertHeld()
rightRepl.raftMu.AssertHeld()
@@ -2458,16 +2471,6 @@ func (s *Store) MergeRange(
return err
}
- if crash := rand.Intn(3) == 0; crash {
- p, err := os.FindProcess(os.Getpid())
- if err != nil {
- log.Fatal(ctx, err)
- }
- if err := p.Kill(); err != nil {
- log.Fatal(ctx, err)
- }
- }
-
// Note that we were called (indirectly) from raft processing so we must
// call removeReplicaImpl directly to avoid deadlocking on the right-hand
// replica's raftMu.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment