|
from pymongo import MongoClient |
|
import json |
|
|
|
class IndexRecommender: |
|
def __init__(self, collection): |
|
self.collection = collection |
|
|
|
def recommend_indexes(self, pipeline): |
|
# Get the explain output for the aggregation pipeline with execution stats |
|
explain_output = self.collection.aggregate(pipeline, explain=True) |
|
|
|
# Analyze the executionStats and stages to recommend indexes |
|
return self.analyze_execution_stats(explain_output) |
|
|
|
def analyze_execution_stats(self, explain_output): |
|
recommendations = [] |
|
|
|
# Parse the stages in the aggregation pipeline |
|
stages = explain_output.get("stages", []) |
|
|
|
compound_fields = [] |
|
partial_index_conditions = {} |
|
|
|
for stage in stages: |
|
if "$cursor" in stage: |
|
cursor_stage = stage["$cursor"]["queryPlanner"]["winningPlan"] |
|
|
|
# Check if the query is performing a collection scan (COLLSCAN) |
|
if cursor_stage["stage"] == "COLLSCAN": |
|
recommendations.append(self.suggest_index(cursor_stage, index_type="single")) |
|
|
|
# Check execution stats for document scan inefficiencies |
|
exec_stats = stage["$cursor"].get("executionStats", {}) |
|
total_docs_examined = exec_stats.get("totalDocsExamined", 0) |
|
n_returned = exec_stats.get("nReturned", 0) |
|
|
|
# If too many documents are examined compared to those returned, suggest index optimization |
|
if n_returned > 0 and total_docs_examined > n_returned * 10: # Arbitrary inefficiency threshold |
|
recommendations.append(self.suggest_index(cursor_stage, index_type="single")) |
|
|
|
# Look for stages that can benefit from indexes |
|
if "$match" in stage: |
|
match_stage = stage["$match"] |
|
self.process_match_stage(match_stage, recommendations, compound_fields, partial_index_conditions) |
|
|
|
if "$sort" in stage: |
|
sort_stage = stage["$sort"] |
|
self.process_sort_stage(sort_stage, recommendations, compound_fields) |
|
|
|
if "$lookup" in stage: |
|
lookup_stage = stage["$lookup"] |
|
self.process_lookup_stage(lookup_stage, recommendations) |
|
|
|
if "$group" in stage: |
|
group_stage = stage["$group"] |
|
self.process_group_stage(group_stage, recommendations) |
|
|
|
# Compound Index Recommendation |
|
if len(compound_fields) > 1: |
|
recommendations.append({ |
|
"index_type": "compound", |
|
"fields": compound_fields, |
|
"collection": explain_output["queryPlanner"]["namespace"].split('.')[1], # Example collection name extraction |
|
"reason": "Optimizing query with multiple filter/sort fields" |
|
}) |
|
|
|
# Partial Index Recommendation |
|
if partial_index_conditions: |
|
recommendations.append({ |
|
"index_type": "partial", |
|
"fields": list(partial_index_conditions.keys()), |
|
"collection": explain_output["queryPlanner"]["namespace"].split('.')[1], # Example collection name extraction |
|
"condition": partial_index_conditions, |
|
"reason": "Filtering on specific document conditions" |
|
}) |
|
|
|
# Return the collected recommendations in JSON format |
|
return json.dumps(recommendations, indent=4) |
|
|
|
def process_match_stage(self, match_stage, recommendations, compound_fields, partial_index_conditions): |
|
"""Process the $match stage and suggest an index.""" |
|
for field, condition in match_stage.items(): |
|
if isinstance(condition, dict): |
|
# Handle range queries like $gte, $lt |
|
if any(op in condition for op in ["$gte", "$lte", "$gt", "$lt"]): |
|
recommendations.append({ |
|
"index_type": "single", |
|
"fields": [field], |
|
"collection": self.collection.name, |
|
"reason": "Range query optimization" |
|
}) |
|
compound_fields.append(field) |
|
# Check for partial index conditions (e.g., status = "active") |
|
if "$eq" in condition: |
|
partial_index_conditions[field] = condition["$eq"] |
|
else: |
|
# Handle equality match |
|
recommendations.append({ |
|
"index_type": "single", |
|
"fields": [field], |
|
"collection": self.collection.name, |
|
"reason": "Equality query optimization" |
|
}) |
|
compound_fields.append(field) |
|
|
|
def process_sort_stage(self, sort_stage, recommendations, compound_fields): |
|
"""Process the $sort stage and suggest an index.""" |
|
for field, direction in sort_stage.items(): |
|
recommendations.append({ |
|
"index_type": "single", |
|
"fields": [field], |
|
"collection": self.collection.name, |
|
"reason": "Sorting optimization to avoid in-memory sorting" |
|
}) |
|
compound_fields.append(field) |
|
|
|
def process_lookup_stage(self, lookup_stage, recommendations): |
|
"""Process the $lookup stage and suggest an index for the foreign collection.""" |
|
foreign_field = lookup_stage["foreignField"] |
|
recommendations.append({ |
|
"index_type": "single", |
|
"fields": [foreign_field], |
|
"collection": lookup_stage["from"], # Foreign collection |
|
"reason": "Optimize $lookup with indexed foreignField" |
|
}) |
|
|
|
def process_group_stage(self, group_stage, recommendations): |
|
"""Process the $group stage and suggest an index for min/max/first/last accumulators.""" |
|
for field, accumulator in group_stage.items(): |
|
if isinstance(accumulator, dict): |
|
# Look for $min, $max, $first, $last accumulators |
|
for op in ["$min", "$max", "$first", "$last"]: |
|
if op in accumulator: |
|
recommendations.append({ |
|
"index_type": "single", |
|
"fields": [accumulator[op]], |
|
"collection": self.collection.name, |
|
"reason": f"Optimize $group stage accumulator ({op})" |
|
}) |
|
|
|
def suggest_index(self, query_plan, index_type="single"): |
|
# Extract the filter fields and suggest indexes based on them |
|
filter_fields = self.get_filter_fields(query_plan) |
|
if filter_fields: |
|
return { |
|
"index_type": index_type, |
|
"fields": filter_fields, |
|
"collection": self.collection.name, |
|
"reason": "Optimization for filtered query" |
|
} |
|
return None |
|
|
|
def get_filter_fields(self, query_plan): |
|
# Extract the filter fields from the query plan |
|
parsed_query = query_plan.get("parsedQuery", {}) |
|
filter_fields = list(parsed_query.keys()) |
|
return filter_fields |
|
|
|
|
|
# Example Usage |
|
def main(): |
|
# Connect to MongoDB |
|
client = MongoClient("mongodb://localhost:27017/") |
|
db = client["example_db"] |
|
collection = db["example_collection"] |
|
|
|
# Example aggregation pipeline |
|
pipeline = [ |
|
{"$match": {"status": "processing", "orderDate": {"$gte": "2023-01-01"}}}, |
|
{"$sort": {"orderDate": -1}}, |
|
{"$group": {"_id": "$customerId", "total": {"$sum": "$amount"}}}, |
|
{"$lookup": {"from": "customers", "localField": "customerId", "foreignField": "_id", "as": "customerInfo"}} |
|
] |
|
|
|
# Initialize the recommender |
|
recommender = IndexRecommender(collection) |
|
|
|
# Get index recommendations for the aggregation pipeline |
|
recommendations = recommender.recommend_indexes(pipeline) |
|
|
|
# Print recommendations |
|
print(recommendations) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|
|
''' |
|
Output will be |
|
|
|
[ |
|
{ |
|
"index_type": "single", |
|
"fields": [ |
|
"status" |
|
], |
|
"collection": "example_collection", |
|
"reason": "Equality query optimization" |
|
}, |
|
{ |
|
"index_type": "single", |
|
"fields": [ |
|
"orderDate" |
|
], |
|
"collection": "example_collection", |
|
"reason": "Range query optimization" |
|
}, |
|
{ |
|
"index_type": "single", |
|
"fields": [ |
|
"orderDate" |
|
], |
|
"collection": "example_collection", |
|
"reason": "Sorting optimization to avoid in-memory sorting" |
|
}, |
|
{ |
|
"index_type": "compound", |
|
"fields": [ |
|
"status", |
|
"orderDate" |
|
], |
|
"collection": "example_collection", |
|
"reason": "Optimizing query with multiple filter/sort fields" |
|
}, |
|
{ |
|
"index_type": "single", |
|
"fields": [ |
|
"_id" |
|
], |
|
"collection": "customers", |
|
"reason": "Optimize $lookup with indexed foreignField" |
|
}, |
|
{ |
|
"index_type": "partial", |
|
"fields": [ |
|
"status" |
|
], |
|
"collection": "example_collection", |
|
"condition": { |
|
"status": "processing" |
|
}, |
|
"reason": "Filtering on specific document conditions" |
|
} |
|
] |
|
''' |