Created
May 1, 2019 16:23
-
-
Save JaiHirsch/deeea82c73284d63e210508da66b85fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scanner; | |
import com.mongodb.client.*; | |
import org.bson.Document; | |
import org.bson.conversions.Bson; | |
import org.bson.types.ObjectId; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
import static com.mongodb.client.model.Filters.*; | |
public class SplitVectorScanThreaded { | |
public static AtomicInteger count = new AtomicInteger(); | |
private static final Long PARTITION_DENOMINATOR = 5L; | |
public static void main(String[] args) throws Throwable { | |
try (MongoClient client = MongoClients.create()) { | |
MongoDatabase db = client.getDatabase("test"); | |
List<FindIterable> splitVectorCursors = createSplitVectorCursors(db); | |
List<Future> futures = new ArrayList<>(); | |
ExecutorService findIterableExecutorService = Executors.newFixedThreadPool(splitVectorCursors.size()); | |
splitVectorCursors.forEach(findIterable -> futures.add(findIterableExecutorService.submit(simpleMongoIter(findIterable)))); | |
futures.forEach(future -> { | |
try { | |
future.get(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
findIterableExecutorService.shutdown(); | |
} | |
System.out.println("Iterated through: "+count.get()); | |
} | |
private static List<FindIterable> createSplitVectorCursors(MongoDatabase db) { | |
List<FindIterable> iterables = new ArrayList<>(); | |
List<ObjectId> splitKeys = prepareListOfSplitKeys(db); | |
MongoCollection<Document> collection = db.getCollection("test"); | |
for (int i = 0; i < splitKeys.size(); i++) { | |
Bson query; | |
ObjectId minKey = splitKeys.get(i); | |
if (i < splitKeys.size() - 1) { | |
ObjectId maxKey = splitKeys.get(i + 1); | |
query = and(Arrays.asList(gte("_id", minKey), lt("_id", maxKey))); | |
} else { | |
query = gte("_id", minKey); | |
} | |
iterables.add(collection.find(query)); | |
} | |
return iterables; | |
} | |
private static List<ObjectId> prepareListOfSplitKeys(MongoDatabase db) { | |
List<ObjectId> splitKeys = new ArrayList<>(); | |
getSplitVectorResults(db).forEach(doc -> splitKeys.add(doc.getObjectId("_id"))); | |
Collections.sort(splitKeys); | |
splitKeys.add(0, new ObjectId("000000000000000000000000")); | |
return splitKeys; | |
} | |
private static List<Document> getSplitVectorResults(MongoDatabase db) { | |
Document collStats = db.runCommand(new Document("collStats", "test").append("scale", 1)); | |
long partitionKeyCount = Long.valueOf(collStats.getInteger("count")) / PARTITION_DENOMINATOR; | |
long avgObjSize = Long.valueOf(collStats.getInteger("avgObjSize")); | |
return ((List<Document>) db.runCommand(new Document("splitVector", "test.test").append("keyPattern", new Document("_id", 1)).append("maxChunkSizeBytes", (partitionKeyCount * avgObjSize))).get("splitKeys")); | |
} | |
// This is where you act on the documents in the iterable | |
public static Runnable simpleMongoIter(FindIterable<Document> iterable) { | |
return () -> iterable.forEach((Consumer<Document>) document -> SplitVectorScanThreaded.count.incrementAndGet()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment