Created
December 30, 2023 23:24
-
-
Save arjunsk/5524ab3bcdce32417fba798aeb1b1de7 to your computer and use it in GitHub Desktop.
Ways of Building Query Plan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// LogicalPlan is a logical representation of a query. Each LogicalPlan is a | |
// sub-tree of the query. It is built recursively. | |
type LogicalPlan struct { | |
Input *LogicalPlan | |
// Each LogicalPlan struct must only have one of the following. | |
SchemaScan *SchemaScan | |
TableScan *TableScan | |
Filter *Filter | |
Distinct *Distinct | |
Projection *Projection | |
Aggregation *Aggregation | |
} | |
..... | |
func (p *PhysicalProjectionPushDown) optimize(plan *LogicalPlan, columnsUsedExprs []Expr) { | |
switch { | |
case plan.SchemaScan != nil: | |
plan.SchemaScan.PhysicalProjection = append(p.defaultProjections, columnsUsedExprs...) | |
case plan.TableScan != nil: | |
plan.TableScan.PhysicalProjection = append(p.defaultProjections, columnsUsedExprs...) | |
case plan.Filter != nil: | |
p.defaultProjections = []Expr{} | |
columnsUsedExprs = append(columnsUsedExprs, plan.Filter.Expr.ColumnsUsedExprs()...) | |
case plan.Distinct != nil: | |
p.defaultProjections = []Expr{} | |
for _, expr := range plan.Distinct.Exprs { | |
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...) | |
} | |
case plan.Projection != nil: | |
p.defaultProjections = []Expr{} | |
for _, expr := range plan.Projection.Exprs { | |
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...) | |
} | |
case plan.Aggregation != nil: | |
for _, expr := range plan.Aggregation.GroupExprs { | |
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...) | |
} | |
for _, expr := range plan.Aggregation.AggExprs { | |
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...) | |
} | |
p.defaultProjections = []Expr{} | |
columnsUsedExprs = append(columnsUsedExprs, DynCol(hashedMatch)) | |
} | |
if plan.Input != nil { | |
p.optimize(plan.Input, columnsUsedExprs) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
message Plan { | |
oneof plan { | |
Query query = 1; | |
TransationControl tcl = 2; | |
DataDefinition ddl = 3; | |
DataControl dcl = 4; | |
} | |
int32 try_run_times = 5; | |
bool is_prepare = 6; | |
} | |
message Query { | |
enum StatementType { | |
UNKNOWN = 0; | |
SELECT = 1; | |
INSERT = 2; | |
REPLACE = 3; | |
DELETE = 4; | |
UPDATE = 5; | |
MERGE = 6; | |
} | |
StatementType stmt_type = 1; | |
// A query may need to run in steps. This in theory is not | |
// necessary but often convenient and/or can be better optimized. | |
// For example, executing non correctlated scalar subquery first | |
// we can plug the value in the optmizer and the newly available | |
// value may generate better plan. | |
// Each step is simply a root node. Root node refers to other | |
// node as children and the whole step is a DAG. | |
repeated int32 steps = 2; | |
// All the nodes. It is OK to have dangle nodes, we only excute nodes | |
// reachable from step roots. | |
repeated Node nodes = 3; | |
// Bound Parameter for the query. | |
repeated Expr params = 4; | |
// return head | |
repeated string headings = 5; | |
// load Tag | |
bool loadTag = 6; | |
} | |
type QueryBuilder struct { | |
qry *plan.Query | |
compCtx CompilerContext | |
ctxByNode []*BindContext | |
nameByColRef map[[2]int32]string | |
tag2Table map[int32]*TableDef | |
nextTag int32 | |
isPrepareStatement bool | |
mysqlCompatible bool | |
haveOnDuplicateKey bool // if it's a plan contain onduplicate key node, we can not use some optmize rule | |
isForUpdate bool // if it's a query plan for update | |
deleteNode map[uint64]int32 //delete node in this query. key is tableId, value is the nodeId of sinkScan node in the delete plan | |
} | |
// Nodes are Operators | |
joinMetaAndCentroidsId := builder.appendNode(&plan.Node{ | |
NodeType: plan.Node_JOIN, | |
JoinType: plan.Node_SINGLE, | |
Children: []int32{centroidsScanId, metaTableScanId}, | |
ProjectList: joinProjections, | |
}, bindCtx) | |
filterCentroidsForCurrVersionId := builder.appendNode(&plan.Node{ | |
NodeType: plan.Node_FILTER, | |
Children: []int32{joinMetaAndCentroidsId}, | |
FilterList: []*Expr{whereCentroidVersionEqCurrVersion}, | |
ProjectList: prevProjections[:3], | |
}, bindCtx) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Plan | |
{ | |
private final PlanNode root; | |
private final TypeProvider types; | |
private final StatsAndCosts statsAndCosts; | |
public Plan(PlanNode root, TypeProvider types, StatsAndCosts statsAndCosts) | |
{ | |
this.root = requireNonNull(root, "root is null"); | |
this.types = requireNonNull(types, "types is null"); | |
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null"); | |
} | |
public PlanNode getRoot() | |
{ | |
return root; | |
} | |
public TypeProvider getTypes() | |
{ | |
return types; | |
} | |
public StatsAndCosts getStatsAndCosts() | |
{ | |
return statsAndCosts; | |
} | |
public Map<PlanNodeId, PlanNode> getPlanIdNodeMap() | |
{ | |
Iterable<PlanNode> planIterator = Traverser.forTree(PlanNode::getSources) | |
.depthFirstPreOrder(root); | |
ImmutableMap.Builder<PlanNodeId, PlanNode> planNodeMap = ImmutableMap.builder(); | |
for (PlanNode node : planIterator) { | |
planNodeMap.put(node.getId(), node); | |
} | |
return planNodeMap.build(); | |
} | |
} | |
/** | |
* The basic component of a Presto IR (logic plan). | |
* An IR is a tree structure with each PlanNode performing a specific operation. | |
*/ | |
@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, property = "@type") | |
public abstract class PlanNode | |
{ | |
private final Optional<SourceLocation> sourceLocation; | |
private final PlanNodeId id; | |
/** | |
* A statistically equivalent version of plan node, i.e. number of output rows/size remains similar. | |
* This is assigned by Presto optimizer. | |
* Once assigned by the planner, further optimizer rules should respect this id when changing the plan. | |
* | |
* For example, when doing pushdown: Filter(TableScan()) -> TableScan(), | |
* output TableScan should have same statsEquivalentPlanNode as input Filter node | |
*/ | |
private final Optional<PlanNode> statsEquivalentPlanNode; | |
protected PlanNode(Optional<SourceLocation> sourceLocation, PlanNodeId id, Optional<PlanNode> statsEquivalentPlanNode) | |
{ | |
this.sourceLocation = sourceLocation; | |
this.id = requireNonNull(id, "id is null"); | |
this.statsEquivalentPlanNode = requireNonNull(statsEquivalentPlanNode, "statsEquivalentPlanNode is null"); | |
} | |
@JsonProperty("id") | |
public PlanNodeId getId() | |
{ | |
return id; | |
} | |
@JsonProperty("sourceLocation") | |
public Optional<SourceLocation> getSourceLocation() | |
{ | |
return sourceLocation; | |
} | |
public Optional<PlanNode> getStatsEquivalentPlanNode() | |
{ | |
return statsEquivalentPlanNode; | |
} | |
/** | |
* Get the upstream PlanNodes (i.e., children) of the current PlanNode. | |
*/ | |
public abstract List<PlanNode> getSources(); | |
/** | |
* Logical properties are a function of source properties and the operation performed by the plan node | |
*/ | |
public LogicalProperties computeLogicalProperties(LogicalPropertiesProvider logicalPropertiesProvider) | |
{ | |
requireNonNull(logicalPropertiesProvider, "logicalPropertiesProvider cannot be null."); | |
return logicalPropertiesProvider.getDefaultProperties(); | |
} | |
/** | |
* The output from the upstream PlanNodes. | |
* It should serve as the input for the current PlanNode. | |
*/ | |
public abstract List<VariableReferenceExpression> getOutputVariables(); | |
/** | |
* Alter the upstream PlanNodes of the current PlanNode. | |
*/ | |
public abstract PlanNode replaceChildren(List<PlanNode> newChildren); | |
/** | |
* A visitor pattern interface to operate on IR. | |
*/ | |
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) | |
{ | |
return visitor.visitPlan(this, context); | |
} | |
/** | |
* Assigns statsEquivalentPlanNode to the plan node | |
*/ | |
public abstract PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode); | |
} | |
@Immutable | |
public final class FilterNode | |
extends PlanNode | |
{ | |
private final PlanNode source; | |
private final RowExpression predicate; | |
@JsonCreator | |
public FilterNode( | |
Optional<SourceLocation> sourceLocation, | |
@JsonProperty("id") PlanNodeId id, | |
@JsonProperty("source") PlanNode source, | |
@JsonProperty("predicate") RowExpression predicate) | |
{ | |
this(sourceLocation, id, Optional.empty(), source, predicate); | |
} | |
public FilterNode( | |
Optional<SourceLocation> sourceLocation, | |
PlanNodeId id, | |
Optional<PlanNode> statsEquivalentPlanNode, | |
PlanNode source, | |
RowExpression predicate) | |
{ | |
super(sourceLocation, id, statsEquivalentPlanNode); | |
this.source = source; | |
this.predicate = predicate; | |
} | |
/** | |
* Get the predicate (a RowExpression of boolean type) of the FilterNode. | |
* It serves as the criteria to determine whether the incoming rows should be filtered out or not. | |
*/ | |
@JsonProperty | |
public RowExpression getPredicate() | |
{ | |
return predicate; | |
} | |
/** | |
* FilterNode only expects a single upstream PlanNode. | |
*/ | |
@JsonProperty("source") | |
public PlanNode getSource() | |
{ | |
return source; | |
} | |
@Override | |
public List<VariableReferenceExpression> getOutputVariables() | |
{ | |
return source.getOutputVariables(); | |
} | |
@Override | |
public List<PlanNode> getSources() | |
{ | |
return unmodifiableList(singletonList(source)); | |
} | |
@Override | |
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) | |
{ | |
return visitor.visitFilter(this, context); | |
} | |
@Override | |
public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode) | |
{ | |
return new FilterNode(getSourceLocation(), getId(), statsEquivalentPlanNode, source, predicate); | |
} | |
@Override | |
public PlanNode replaceChildren(List<PlanNode> newChildren) | |
{ | |
// FilterNode only expects a single upstream PlanNode | |
if (newChildren == null || newChildren.size() != 1) { | |
throw new IllegalArgumentException("Expect exactly one child to replace"); | |
} | |
return new FilterNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), newChildren.get(0), predicate); | |
} | |
@Override | |
public LogicalProperties computeLogicalProperties(LogicalPropertiesProvider logicalPropertiesProvider) | |
{ | |
requireNonNull(logicalPropertiesProvider, "logicalPropertiesProvider cannot be null."); | |
return logicalPropertiesProvider.getFilterProperties(this); | |
} | |
@Override | |
public boolean equals(Object o) | |
{ | |
if (this == o) { | |
return true; | |
} | |
if (o == null || getClass() != o.getClass()) { | |
return false; | |
} | |
FilterNode that = (FilterNode) o; | |
return Objects.equals(source, that.source) && | |
Objects.equals(predicate, that.predicate); | |
} | |
@Override | |
public int hashCode() | |
{ | |
return Objects.hash(source, predicate); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Plan is the description of an execution flow. | |
// It is created from ast.Node first, then optimized by the optimizer, | |
// finally used by the executor to create a Cursor which executes the statement. | |
type Plan interface { | |
// Get the schema. | |
Schema() *expression.Schema | |
// Get the ID. | |
ID() int | |
// TP get the plan type. | |
TP() string | |
// Get the ID in explain statement | |
ExplainID() fmt.Stringer | |
// ExplainInfo returns operator information to be explained. | |
ExplainInfo() string | |
// replaceExprColumns replace all the column reference in the plan's expression node. | |
replaceExprColumns(replace map[string]*expression.Column) | |
SCtx() sessionctx.Context | |
// property.StatsInfo will return the property.StatsInfo for this plan. | |
statsInfo() *property.StatsInfo | |
// OutputNames returns the outputting names of each column. | |
OutputNames() types.NameSlice | |
// SetOutputNames sets the outputting name by the given slice. | |
SetOutputNames(names types.NameSlice) | |
} | |
// LogicalPlan is a tree of logical operators. | |
// We can do a lot of logical optimizations to it, like predicate pushdown and column pruning. | |
type LogicalPlan interface { | |
Plan | |
// PredicatePushDown pushes down the predicates in the where/on/having clauses as deeply as possible. | |
// It will accept a predicate that is an expression slice, and return the expressions that can't be pushed. | |
// Because it might change the root if the having clause exists, we need to return a plan that represents a new root. | |
PredicatePushDown([]expression.Expression) ([]expression.Expression, LogicalPlan) | |
// PruneColumns prunes the unused columns. | |
PruneColumns([]*expression.Column) error | |
// findBestTask converts the logical plan to the physical plan. It's a new interface. | |
// It is called recursively from the parent to the children to create the result physical plan. | |
// Some logical plans will convert the children to the physical plans in different ways, and return the one | |
// with the lowest cost. | |
findBestTask(prop *property.PhysicalProperty) (task, error) | |
// BuildKeyInfo will collect the information of unique keys into schema. | |
// Because this method is also used in cascades planner, we cannot use | |
// things like `p.schema` or `p.children` inside it. We should use the `selfSchema` | |
// and `childSchema` instead. | |
BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema) | |
// pushDownTopN will push down the topN or limit operator during logical optimization. | |
pushDownTopN(topN *LogicalTopN) LogicalPlan | |
// recursiveDeriveStats derives statistic info between plans. | |
recursiveDeriveStats() (*property.StatsInfo, error) | |
// DeriveStats derives statistic info for current plan node given child stats. | |
// We need selfSchema, childSchema here because it makes this method can be used in | |
// cascades planner, where LogicalPlan might not record its children or schema. | |
DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema) (*property.StatsInfo, error) | |
// PreparePossibleProperties is only used for join and aggregation. Like group by a,b,c, all permutation of (a,b,c) is | |
// valid, but the ordered indices in leaf plan is limited. So we can get all possible order properties by a pre-walking. | |
PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column | |
// exhaustPhysicalPlans generates all possible plans that can match the required property. | |
exhaustPhysicalPlans(*property.PhysicalProperty) []PhysicalPlan | |
// Get all the children. | |
Children() []LogicalPlan | |
// SetChildren sets the children for the plan. | |
SetChildren(...LogicalPlan) | |
// SetChild sets the ith child for the plan. | |
SetChild(i int, child LogicalPlan) | |
} | |
type baseLogicalPlan struct { | |
basePlan | |
taskMap map[string]task | |
self LogicalPlan | |
children []LogicalPlan | |
} | |
// LogicalSelection represents a where or having predicate. | |
type LogicalSelection struct { | |
baseLogicalPlan | |
// Originally the WHERE or ON condition is parsed into a single expression, | |
// but after we converted to CNF(Conjunctive normal form), it can be | |
// split into a list of AND conditions. | |
Conditions []expression.Expression | |
} | |
// PhysicalPlan is a tree of the physical operators. | |
type PhysicalPlan interface { | |
Plan | |
// attach2Task makes the current physical plan as the father of task's physicalPlan and updates the cost of | |
// current task. If the child's task is cop task, some operator may close this task and return a new rootTask. | |
attach2Task(...task) task | |
// ToPB converts physical plan to tipb executor. | |
ToPB(ctx sessionctx.Context) (*tipb.Executor, error) | |
// getChildReqProps gets the required property by child index. | |
GetChildReqProps(idx int) *property.PhysicalProperty | |
// StatsCount returns the count of property.StatsInfo for this plan. | |
StatsCount() float64 | |
// Get all the children. | |
Children() []PhysicalPlan | |
// SetChildren sets the children for the plan. | |
SetChildren(...PhysicalPlan) | |
// SetChild sets the ith child for the plan. | |
SetChild(i int, child PhysicalPlan) | |
// ResolveIndices resolves the indices for columns. After doing this, the columns can evaluate the rows by their indices. | |
ResolveIndices() error | |
// Stats returns the StatsInfo of the plan. | |
Stats() *property.StatsInfo | |
// ExplainNormalizedInfo returns operator normalized information for generating digest. | |
ExplainNormalizedInfo() string | |
} | |
type basePhysicalPlan struct { | |
basePlan | |
childrenReqProps []*property.PhysicalProperty | |
self PhysicalPlan | |
children []PhysicalPlan | |
} | |
// PhysicalLimit is the physical operator of Limit. | |
type PhysicalLimit struct { | |
basePhysicalPlan | |
Offset uint64 | |
Count uint64 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment