Skip to content

Instantly share code, notes, and snippets.

@mutukrish
Last active October 16, 2024 03:54
Show Gist options
  • Save mutukrish/7f4a91691aa45e977b8102266eb94af4 to your computer and use it in GitHub Desktop.
Save mutukrish/7f4a91691aa45e977b8102266eb94af4 to your computer and use it in GitHub Desktop.
Explain Stats Index Recommendation MDB

How it work:

$match Stage: The system detects fields used in the $match stage and suggests creating indexes for equality matches and range queries ($gte, $lt, etc.).

$sort Stage:
For each field used in $sort, the system suggests creating an index to avoid costly in-memory sorting.

$lookup Stage: The system checks the foreign collection and recommends creating an index on the foreignField in the foreign collection to optimize the join.

$group Stage (with Min/Max/First/Last Accumulators): The system looks for $min, $max, $first, and $last accumulators and suggests creating indexes on the grouped field to make the retrieval of those values faster.

Execution Stats Handling: The code also checks the executionStats from the aggregation explain output and suggests optimizations if too many documents are scanned but few are returned, indicating inefficient filtering.

from pymongo import MongoClient
import json
class IndexRecommender:
def __init__(self, collection):
self.collection = collection
def recommend_indexes(self, pipeline):
# Get the explain output for the aggregation pipeline with execution stats
explain_output = self.collection.aggregate(pipeline, explain=True)
# Analyze the executionStats and stages to recommend indexes
return self.analyze_execution_stats(explain_output)
def analyze_execution_stats(self, explain_output):
recommendations = []
# Parse the stages in the aggregation pipeline
stages = explain_output.get("stages", [])
compound_fields = []
partial_index_conditions = {}
for stage in stages:
if "$cursor" in stage:
cursor_stage = stage["$cursor"]["queryPlanner"]["winningPlan"]
# Check if the query is performing a collection scan (COLLSCAN)
if cursor_stage["stage"] == "COLLSCAN":
recommendations.append(self.suggest_index(cursor_stage, index_type="single"))
# Check execution stats for document scan inefficiencies
exec_stats = stage["$cursor"].get("executionStats", {})
total_docs_examined = exec_stats.get("totalDocsExamined", 0)
n_returned = exec_stats.get("nReturned", 0)
# If too many documents are examined compared to those returned, suggest index optimization
if n_returned > 0 and total_docs_examined > n_returned * 10: # Arbitrary inefficiency threshold
recommendations.append(self.suggest_index(cursor_stage, index_type="single"))
# Look for stages that can benefit from indexes
if "$match" in stage:
match_stage = stage["$match"]
self.process_match_stage(match_stage, recommendations, compound_fields, partial_index_conditions)
if "$sort" in stage:
sort_stage = stage["$sort"]
self.process_sort_stage(sort_stage, recommendations, compound_fields)
if "$lookup" in stage:
lookup_stage = stage["$lookup"]
self.process_lookup_stage(lookup_stage, recommendations)
if "$group" in stage:
group_stage = stage["$group"]
self.process_group_stage(group_stage, recommendations)
# Compound Index Recommendation
if len(compound_fields) > 1:
recommendations.append({
"index_type": "compound",
"fields": compound_fields,
"collection": explain_output["queryPlanner"]["namespace"].split('.')[1], # Example collection name extraction
"reason": "Optimizing query with multiple filter/sort fields"
})
# Partial Index Recommendation
if partial_index_conditions:
recommendations.append({
"index_type": "partial",
"fields": list(partial_index_conditions.keys()),
"collection": explain_output["queryPlanner"]["namespace"].split('.')[1], # Example collection name extraction
"condition": partial_index_conditions,
"reason": "Filtering on specific document conditions"
})
# Return the collected recommendations in JSON format
return json.dumps(recommendations, indent=4)
def process_match_stage(self, match_stage, recommendations, compound_fields, partial_index_conditions):
"""Process the $match stage and suggest an index."""
for field, condition in match_stage.items():
if isinstance(condition, dict):
# Handle range queries like $gte, $lt
if any(op in condition for op in ["$gte", "$lte", "$gt", "$lt"]):
recommendations.append({
"index_type": "single",
"fields": [field],
"collection": self.collection.name,
"reason": "Range query optimization"
})
compound_fields.append(field)
# Check for partial index conditions (e.g., status = "active")
if "$eq" in condition:
partial_index_conditions[field] = condition["$eq"]
else:
# Handle equality match
recommendations.append({
"index_type": "single",
"fields": [field],
"collection": self.collection.name,
"reason": "Equality query optimization"
})
compound_fields.append(field)
def process_sort_stage(self, sort_stage, recommendations, compound_fields):
"""Process the $sort stage and suggest an index."""
for field, direction in sort_stage.items():
recommendations.append({
"index_type": "single",
"fields": [field],
"collection": self.collection.name,
"reason": "Sorting optimization to avoid in-memory sorting"
})
compound_fields.append(field)
def process_lookup_stage(self, lookup_stage, recommendations):
"""Process the $lookup stage and suggest an index for the foreign collection."""
foreign_field = lookup_stage["foreignField"]
recommendations.append({
"index_type": "single",
"fields": [foreign_field],
"collection": lookup_stage["from"], # Foreign collection
"reason": "Optimize $lookup with indexed foreignField"
})
def process_group_stage(self, group_stage, recommendations):
"""Process the $group stage and suggest an index for min/max/first/last accumulators."""
for field, accumulator in group_stage.items():
if isinstance(accumulator, dict):
# Look for $min, $max, $first, $last accumulators
for op in ["$min", "$max", "$first", "$last"]:
if op in accumulator:
recommendations.append({
"index_type": "single",
"fields": [accumulator[op]],
"collection": self.collection.name,
"reason": f"Optimize $group stage accumulator ({op})"
})
def suggest_index(self, query_plan, index_type="single"):
# Extract the filter fields and suggest indexes based on them
filter_fields = self.get_filter_fields(query_plan)
if filter_fields:
return {
"index_type": index_type,
"fields": filter_fields,
"collection": self.collection.name,
"reason": "Optimization for filtered query"
}
return None
def get_filter_fields(self, query_plan):
# Extract the filter fields from the query plan
parsed_query = query_plan.get("parsedQuery", {})
filter_fields = list(parsed_query.keys())
return filter_fields
# Example Usage
def main():
# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["example_db"]
collection = db["example_collection"]
# Example aggregation pipeline
pipeline = [
{"$match": {"status": "processing", "orderDate": {"$gte": "2023-01-01"}}},
{"$sort": {"orderDate": -1}},
{"$group": {"_id": "$customerId", "total": {"$sum": "$amount"}}},
{"$lookup": {"from": "customers", "localField": "customerId", "foreignField": "_id", "as": "customerInfo"}}
]
# Initialize the recommender
recommender = IndexRecommender(collection)
# Get index recommendations for the aggregation pipeline
recommendations = recommender.recommend_indexes(pipeline)
# Print recommendations
print(recommendations)
if __name__ == "__main__":
main()
'''
Output will be
[
{
"index_type": "single",
"fields": [
"status"
],
"collection": "example_collection",
"reason": "Equality query optimization"
},
{
"index_type": "single",
"fields": [
"orderDate"
],
"collection": "example_collection",
"reason": "Range query optimization"
},
{
"index_type": "single",
"fields": [
"orderDate"
],
"collection": "example_collection",
"reason": "Sorting optimization to avoid in-memory sorting"
},
{
"index_type": "compound",
"fields": [
"status",
"orderDate"
],
"collection": "example_collection",
"reason": "Optimizing query with multiple filter/sort fields"
},
{
"index_type": "single",
"fields": [
"_id"
],
"collection": "customers",
"reason": "Optimize $lookup with indexed foreignField"
},
{
"index_type": "partial",
"fields": [
"status"
],
"collection": "example_collection",
"condition": {
"status": "processing"
},
"reason": "Filtering on specific document conditions"
}
]
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment