Last active
April 25, 2018 19:47
-
-
Save johnou/55bf838142b38695830f7941368ce9fb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/protocol/RestoreRequest.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/protocol/RestoreRequest.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda) | |
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/protocol/RestoreRequest.java (revision ) | |
@@ -15,6 +15,8 @@ | |
*/ | |
package io.atomix.protocols.backup.protocol; | |
+import io.atomix.cluster.NodeId; | |
+ | |
import static com.google.common.base.MoreObjects.toStringHelper; | |
/** | |
@@ -22,25 +24,32 @@ | |
*/ | |
public class RestoreRequest extends PrimitiveRequest { | |
- public static RestoreRequest request(PrimitiveDescriptor primitive, long term) { | |
- return new RestoreRequest(primitive, term); | |
+ public static RestoreRequest request(PrimitiveDescriptor primitive, NodeId nodeId, long term) { | |
+ return new RestoreRequest(primitive, nodeId, term); | |
} | |
private final long term; | |
+ private final NodeId nodeId; | |
- public RestoreRequest(PrimitiveDescriptor primitive, long term) { | |
+ public RestoreRequest(PrimitiveDescriptor primitive, NodeId nodeId, long term) { | |
super(primitive); | |
this.term = term; | |
+ this.nodeId = nodeId; | |
} | |
public long term() { | |
return term; | |
} | |
+ public NodeId nodeId() { | |
+ return nodeId; | |
+ } | |
+ | |
@Override | |
public String toString() { | |
return toStringHelper(this) | |
.add("primitive", primitive()) | |
+ .add("nodeId", nodeId()) | |
.add("term", term()) | |
.toString(); | |
} | |
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/PrimaryRole.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/PrimaryRole.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda) | |
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/PrimaryRole.java (revision ) | |
@@ -19,7 +19,6 @@ | |
import io.atomix.primitive.operation.OperationType; | |
import io.atomix.primitive.service.impl.DefaultCommit; | |
import io.atomix.primitive.session.Session; | |
-import io.atomix.primitive.session.Sessions; | |
import io.atomix.protocols.backup.PrimaryBackupServer.Role; | |
import io.atomix.protocols.backup.impl.PrimaryBackupSession; | |
import io.atomix.protocols.backup.protocol.CloseOperation; | |
@@ -171,6 +170,9 @@ | |
context.service().backup(buffer); | |
buffer.flip(); | |
byte[] bytes = buffer.readBytes(buffer.remaining()); | |
+ | |
+ replicator.removePreviousOperation(request.nodeId(), context.currentIndex()); | |
+ | |
return CompletableFuture.completedFuture( | |
RestoreResponse.ok(context.currentIndex(), context.currentTimestamp(), bytes)) | |
.thenApply(this::logResponse); | |
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/Replicator.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/Replicator.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda) | |
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/Replicator.java (revision ) | |
@@ -15,6 +15,7 @@ | |
*/ | |
package io.atomix.protocols.backup.roles; | |
+import io.atomix.cluster.NodeId; | |
import io.atomix.protocols.backup.protocol.BackupOperation; | |
import java.util.concurrent.CompletableFuture; | |
@@ -32,6 +33,14 @@ | |
*/ | |
CompletableFuture<Void> replicate(BackupOperation operation); | |
+ /** | |
+ * Clear backup queue of given node. | |
+ * | |
+ * @param nodeId node id | |
+ * @param endIndex remove all operations that index less than endIndex | |
+ */ | |
+ void removePreviousOperation(NodeId nodeId, long endIndex); | |
+ | |
/** | |
* Closes the replicator. | |
*/ | |
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/BackupRole.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/BackupRole.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda) | |
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/BackupRole.java (revision ) | |
@@ -154,7 +154,7 @@ | |
* Requests a restore from the primary. | |
*/ | |
private void requestRestore(NodeId primary) { | |
- context.protocol().restore(primary, RestoreRequest.request(context.descriptor(), context.currentTerm())) | |
+ context.protocol().restore(primary, RestoreRequest.request(context.descriptor(), context.nodeId(), context.currentTerm())) | |
.whenCompleteAsync((response, error) -> { | |
if (error == null && response.status() == PrimaryBackupResponse.Status.OK) { | |
context.resetIndex(response.index(), response.timestamp()); | |
@@ -167,6 +167,7 @@ | |
context.service().restore(buffer); | |
operations.clear(); | |
+ //context.setCommitIndex(response.index()); // don't think this is needed | |
} | |
}, context.threadContext()); | |
} | |
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/AsynchronousReplicator.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/AsynchronousReplicator.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda) | |
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/AsynchronousReplicator.java (revision ) | |
@@ -20,10 +20,12 @@ | |
import io.atomix.protocols.backup.protocol.BackupOperation; | |
import io.atomix.protocols.backup.protocol.BackupRequest; | |
import io.atomix.protocols.backup.service.impl.PrimaryBackupServiceContext; | |
+import io.atomix.utils.concurrent.Futures; | |
import io.atomix.utils.concurrent.Scheduled; | |
import org.slf4j.Logger; | |
import java.time.Duration; | |
+import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.LinkedList; | |
import java.util.List; | |
@@ -49,11 +51,22 @@ | |
@Override | |
public CompletableFuture<Void> replicate(BackupOperation operation) { | |
+ List<CompletableFuture<Void>> futures = new ArrayList<>(context.backups().size()); | |
for (NodeId backup : context.backups()) { | |
- queues.computeIfAbsent(backup, BackupQueue::new).add(operation); | |
+ futures.add(queues.computeIfAbsent(backup, BackupQueue::new).add(operation)); | |
} | |
- context.setCommitIndex(operation.index()); | |
- return CompletableFuture.completedFuture(null); | |
+ return Futures.allOf(futures).thenApply(v -> { | |
+ context.setCommitIndex(operation.index()); | |
+ return null; | |
+ }); | |
+ } | |
+ | |
+ @Override | |
+ public void removePreviousOperation(NodeId nodeId, long endIndex) { | |
+ queues.computeIfPresent(nodeId, (node, queue) -> { | |
+ queue.clear(endIndex); | |
+ return queue; | |
+ }); | |
} | |
@Override | |
@@ -68,6 +81,7 @@ | |
private final Queue<BackupOperation> operations = new LinkedList<>(); | |
private final NodeId nodeId; | |
private final Scheduled backupTimer; | |
+ private CompletableFuture<Void> backupFuture = CompletableFuture.completedFuture(null); | |
private long lastSent; | |
BackupQueue(NodeId nodeId) { | |
@@ -81,11 +95,12 @@ | |
* | |
* @param operation the operation to add | |
*/ | |
- void add(BackupOperation operation) { | |
+ CompletableFuture<Void> add(BackupOperation operation) { | |
operations.add(operation); | |
if (operations.size() >= MAX_BATCH_SIZE) { | |
- backup(); | |
+ backupFuture = backupFuture.thenCompose(v -> backup()); | |
} | |
+ return backupFuture; | |
} | |
/** | |
@@ -93,14 +108,14 @@ | |
*/ | |
private void maybeBackup() { | |
if (System.currentTimeMillis() - lastSent > MAX_BATCH_TIME && !operations.isEmpty()) { | |
- backup(); | |
+ backupFuture = backupFuture.thenCompose(v -> backup()); | |
} | |
} | |
/** | |
* Sends the next batch to the backup. | |
*/ | |
- private void backup() { | |
+ private CompletableFuture<Void> backup() { | |
List<BackupOperation> batch = ImmutableList.copyOf(operations); | |
operations.clear(); | |
BackupRequest request = BackupRequest.request( | |
@@ -110,8 +125,12 @@ | |
context.currentIndex(), | |
batch); | |
log.trace("Sending {} to {}", request, nodeId); | |
- context.protocol().backup(nodeId, request); | |
- lastSent = System.currentTimeMillis(); | |
+ return context.protocol().backup(nodeId, request).whenComplete((response, throwable) -> { | |
+ if (throwable != null) { | |
+ log.debug("Failed sending batch to backup", throwable); | |
+ } | |
+ lastSent = System.currentTimeMillis(); | |
+ }).thenApply(v -> null); | |
} | |
/** | |
@@ -120,5 +139,16 @@ | |
void close() { | |
backupTimer.cancel(); | |
} | |
+ | |
+ /** | |
+ * Clears the queue. | |
+ */ | |
+ void clear(long index) { | |
+ BackupOperation op = operations.peek(); | |
+ while (op != null && op.index() <= index) { | |
+ operations.remove(); | |
+ op = operations.peek(); | |
+ } | |
+ } | |
} | |
} | |
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/SynchronousReplicator.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/SynchronousReplicator.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda) | |
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/SynchronousReplicator.java (revision ) | |
@@ -58,6 +58,15 @@ | |
return future; | |
} | |
+ @Override | |
+ public void removePreviousOperation(NodeId nodeId, long endIndex) { | |
+ queues.computeIfPresent(nodeId, (node, queue) -> { | |
+ queue.clear(endIndex); | |
+ return queue; | |
+ }); | |
+ } | |
+ | |
+ | |
/** | |
* Completes futures. | |
*/ | |
@@ -151,5 +160,16 @@ | |
}, context.threadContext()); | |
operations.clear(); | |
} | |
+ | |
+ /** | |
+ * Clears the queue. | |
+ */ | |
+ void clear(long index) { | |
+ BackupOperation op = operations.peek(); | |
+ while (op != null && op.index() <= index) { | |
+ operations.remove(); | |
+ op = operations.peek(); | |
+ } | |
+ } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment