Skip to content

Instantly share code, notes, and snippets.

@johnou
Last active April 25, 2018 19:47
Show Gist options
  • Save johnou/55bf838142b38695830f7941368ce9fb to your computer and use it in GitHub Desktop.
Save johnou/55bf838142b38695830f7941368ce9fb to your computer and use it in GitHub Desktop.
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/protocol/RestoreRequest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/protocol/RestoreRequest.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda)
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/protocol/RestoreRequest.java (revision )
@@ -15,6 +15,8 @@
*/
package io.atomix.protocols.backup.protocol;
+import io.atomix.cluster.NodeId;
+
import static com.google.common.base.MoreObjects.toStringHelper;
/**
@@ -22,25 +24,32 @@
*/
public class RestoreRequest extends PrimitiveRequest {
- public static RestoreRequest request(PrimitiveDescriptor primitive, long term) {
- return new RestoreRequest(primitive, term);
+ public static RestoreRequest request(PrimitiveDescriptor primitive, NodeId nodeId, long term) {
+ return new RestoreRequest(primitive, nodeId, term);
}
private final long term;
+ private final NodeId nodeId;
- public RestoreRequest(PrimitiveDescriptor primitive, long term) {
+ public RestoreRequest(PrimitiveDescriptor primitive, NodeId nodeId, long term) {
super(primitive);
this.term = term;
+ this.nodeId = nodeId;
}
public long term() {
return term;
}
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
@Override
public String toString() {
return toStringHelper(this)
.add("primitive", primitive())
+ .add("nodeId", nodeId())
.add("term", term())
.toString();
}
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/PrimaryRole.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/PrimaryRole.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda)
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/PrimaryRole.java (revision )
@@ -19,7 +19,6 @@
import io.atomix.primitive.operation.OperationType;
import io.atomix.primitive.service.impl.DefaultCommit;
import io.atomix.primitive.session.Session;
-import io.atomix.primitive.session.Sessions;
import io.atomix.protocols.backup.PrimaryBackupServer.Role;
import io.atomix.protocols.backup.impl.PrimaryBackupSession;
import io.atomix.protocols.backup.protocol.CloseOperation;
@@ -171,6 +170,9 @@
context.service().backup(buffer);
buffer.flip();
byte[] bytes = buffer.readBytes(buffer.remaining());
+
+ replicator.removePreviousOperation(request.nodeId(), context.currentIndex());
+
return CompletableFuture.completedFuture(
RestoreResponse.ok(context.currentIndex(), context.currentTimestamp(), bytes))
.thenApply(this::logResponse);
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/Replicator.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/Replicator.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda)
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/Replicator.java (revision )
@@ -15,6 +15,7 @@
*/
package io.atomix.protocols.backup.roles;
+import io.atomix.cluster.NodeId;
import io.atomix.protocols.backup.protocol.BackupOperation;
import java.util.concurrent.CompletableFuture;
@@ -32,6 +33,14 @@
*/
CompletableFuture<Void> replicate(BackupOperation operation);
+ /**
+ * Clear backup queue of given node.
+ *
+ * @param nodeId node id
+ * @param endIndex remove all operations that index less than endIndex
+ */
+ void removePreviousOperation(NodeId nodeId, long endIndex);
+
/**
* Closes the replicator.
*/
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/BackupRole.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/BackupRole.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda)
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/BackupRole.java (revision )
@@ -154,7 +154,7 @@
* Requests a restore from the primary.
*/
private void requestRestore(NodeId primary) {
- context.protocol().restore(primary, RestoreRequest.request(context.descriptor(), context.currentTerm()))
+ context.protocol().restore(primary, RestoreRequest.request(context.descriptor(), context.nodeId(), context.currentTerm()))
.whenCompleteAsync((response, error) -> {
if (error == null && response.status() == PrimaryBackupResponse.Status.OK) {
context.resetIndex(response.index(), response.timestamp());
@@ -167,6 +167,7 @@
context.service().restore(buffer);
operations.clear();
+ //context.setCommitIndex(response.index()); // don't think this is needed
}
}, context.threadContext());
}
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/AsynchronousReplicator.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/AsynchronousReplicator.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda)
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/AsynchronousReplicator.java (revision )
@@ -20,10 +20,12 @@
import io.atomix.protocols.backup.protocol.BackupOperation;
import io.atomix.protocols.backup.protocol.BackupRequest;
import io.atomix.protocols.backup.service.impl.PrimaryBackupServiceContext;
+import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import org.slf4j.Logger;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -49,11 +51,22 @@
@Override
public CompletableFuture<Void> replicate(BackupOperation operation) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>(context.backups().size());
for (NodeId backup : context.backups()) {
- queues.computeIfAbsent(backup, BackupQueue::new).add(operation);
+ futures.add(queues.computeIfAbsent(backup, BackupQueue::new).add(operation));
}
- context.setCommitIndex(operation.index());
- return CompletableFuture.completedFuture(null);
+ return Futures.allOf(futures).thenApply(v -> {
+ context.setCommitIndex(operation.index());
+ return null;
+ });
+ }
+
+ @Override
+ public void removePreviousOperation(NodeId nodeId, long endIndex) {
+ queues.computeIfPresent(nodeId, (node, queue) -> {
+ queue.clear(endIndex);
+ return queue;
+ });
}
@Override
@@ -68,6 +81,7 @@
private final Queue<BackupOperation> operations = new LinkedList<>();
private final NodeId nodeId;
private final Scheduled backupTimer;
+ private CompletableFuture<Void> backupFuture = CompletableFuture.completedFuture(null);
private long lastSent;
BackupQueue(NodeId nodeId) {
@@ -81,11 +95,12 @@
*
* @param operation the operation to add
*/
- void add(BackupOperation operation) {
+ CompletableFuture<Void> add(BackupOperation operation) {
operations.add(operation);
if (operations.size() >= MAX_BATCH_SIZE) {
- backup();
+ backupFuture = backupFuture.thenCompose(v -> backup());
}
+ return backupFuture;
}
/**
@@ -93,14 +108,14 @@
*/
private void maybeBackup() {
if (System.currentTimeMillis() - lastSent > MAX_BATCH_TIME && !operations.isEmpty()) {
- backup();
+ backupFuture = backupFuture.thenCompose(v -> backup());
}
}
/**
* Sends the next batch to the backup.
*/
- private void backup() {
+ private CompletableFuture<Void> backup() {
List<BackupOperation> batch = ImmutableList.copyOf(operations);
operations.clear();
BackupRequest request = BackupRequest.request(
@@ -110,8 +125,12 @@
context.currentIndex(),
batch);
log.trace("Sending {} to {}", request, nodeId);
- context.protocol().backup(nodeId, request);
- lastSent = System.currentTimeMillis();
+ return context.protocol().backup(nodeId, request).whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ log.debug("Failed sending batch to backup", throwable);
+ }
+ lastSent = System.currentTimeMillis();
+ }).thenApply(v -> null);
}
/**
@@ -120,5 +139,16 @@
void close() {
backupTimer.cancel();
}
+
+ /**
+ * Clears the queue.
+ */
+ void clear(long index) {
+ BackupOperation op = operations.peek();
+ while (op != null && op.index() <= index) {
+ operations.remove();
+ op = operations.peek();
+ }
+ }
}
}
Index: protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/SynchronousReplicator.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/SynchronousReplicator.java (revision b3b1a3d28536f477ef155e3a96ce0a750da5fbda)
+++ protocols/primary-backup/src/main/java/io/atomix/protocols/backup/roles/SynchronousReplicator.java (revision )
@@ -58,6 +58,15 @@
return future;
}
+ @Override
+ public void removePreviousOperation(NodeId nodeId, long endIndex) {
+ queues.computeIfPresent(nodeId, (node, queue) -> {
+ queue.clear(endIndex);
+ return queue;
+ });
+ }
+
+
/**
* Completes futures.
*/
@@ -151,5 +160,16 @@
}, context.threadContext());
operations.clear();
}
+
+ /**
+ * Clears the queue.
+ */
+ void clear(long index) {
+ BackupOperation op = operations.peek();
+ while (op != null && op.index() <= index) {
+ operations.remove();
+ op = operations.peek();
+ }
+ }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment