Last active
May 6, 2024 13:57
-
-
Save jyemin/385f03966bde6d89aef7e7b82622b74b to your computer and use it in GitHub Desktop.
Sketch of KAFKA-374 algorithm based on BaseCluster#selectServer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.mongodb; | |
import com.mongodb.MongoClientSettings; | |
import com.mongodb.client.MongoClient; | |
import com.mongodb.client.MongoClients; | |
import com.mongodb.connection.ClusterDescription; | |
import com.mongodb.event.ClusterDescriptionChangedEvent; | |
import com.mongodb.event.ClusterListener; | |
import org.bson.Document; | |
import java.util.List; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicReference; | |
public class KAFKA374 { | |
private final AtomicReference<CountDownLatch> phase = new AtomicReference<>(new CountDownLatch(1)); | |
private final MongoClient client = MongoClients.create(MongoClientSettings.builder() | |
.applyToClusterSettings(builder -> | |
builder.addClusterListener(new SynchronizingClusterListener())) | |
.build()); | |
private volatile ClusterDescription clusterDescription = client.getClusterDescription(); | |
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
public static void main(String[] args) throws InterruptedException { | |
var test = new KAFKA374(); | |
//noinspection InfiniteLoopStatement | |
while (true) { | |
test.write(List.of(new Document())); | |
//noinspection BusyWait | |
Thread.sleep(1000); | |
} | |
} | |
private void write(List<Document> documentList) throws InterruptedException { | |
CountDownLatch currentPhase = phase.get(); | |
ClusterDescription curDescription = clusterDescription; | |
boolean hasKickedMonitors = false; | |
while (!curDescription.hasWritableServer()) { | |
// HERE: throw RetryingException if server selection timeout exceeded | |
// encourage the server monitors to connect to a writeable server so that this loop doesn't block for | |
// longer than necessary | |
if (!hasKickedMonitors) { | |
executorService.submit(() -> { | |
client.getDatabase("admin").runCommand(new Document("ping", 1)); | |
}); | |
hasKickedMonitors = true; | |
} | |
currentPhase.await(/* remaining timeout */); | |
currentPhase = phase.get(); | |
curDescription = clusterDescription; | |
} | |
try { | |
// Simulate what the Sink Connector does | |
client.getDatabase("test").getCollection("test").insertMany(documentList); | |
} catch (Exception e) { | |
// write all documents to DLQ | |
} | |
} | |
private class SynchronizingClusterListener implements ClusterListener { | |
@Override | |
public void clusterDescriptionChanged(ClusterDescriptionChangedEvent event) { | |
clusterDescription = event.getNewDescription(); | |
phase.getAndSet(new CountDownLatch(1)).countDown(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment