Created
March 18, 2021 16:08
-
-
Save junwen12221/904f3daad528c0250662156e8c8e7ad6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) <2020> <chen junwen> | |
* <p> | |
* This program is free software: you can redistribute it and/or modify it under the terms of the | |
* GNU General Public License as published by the Free Software Foundation, either version 3 of the | |
* License, or (at your option) any later version. | |
* <p> | |
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without | |
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
* General Public License for more details. | |
* <p> | |
* You should have received a copy of the GNU General Public License along with this program. If | |
* not, see <http://www.gnu.org/licenses/>. | |
*/ | |
package io.mycat; | |
import cn.mycat.vertx.xa.XaSqlConnection; | |
import com.alibaba.druid.sql.SQLUtils; | |
import com.alibaba.druid.sql.ast.SQLStatement; | |
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; | |
import com.alibaba.druid.sql.ast.expr.SQLVariantRefExpr; | |
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; | |
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; | |
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlDeleteStatement; | |
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement; | |
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement; | |
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter; | |
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlExportParameterVisitor; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.ImmutableSet; | |
import io.mycat.api.collector.RowBaseIterator; | |
import io.mycat.beans.mycat.ResultSetBuilder; | |
import io.mycat.beans.mysql.MySQLErrorCode; | |
import io.mycat.calcite.*; | |
import io.mycat.calcite.executor.MycatPreparedStatementUtil; | |
import io.mycat.calcite.logical.MycatView; | |
import io.mycat.calcite.physical.MycatInsertRel; | |
import io.mycat.calcite.physical.MycatUpdateRel; | |
import io.mycat.calcite.plan.ObservablePlanImplementorImpl; | |
import io.mycat.calcite.plan.PlanImplementor; | |
import io.mycat.calcite.rewriter.Distribution; | |
import io.mycat.calcite.rewriter.MatierialRewriter; | |
import io.mycat.calcite.rewriter.OptimizationContext; | |
import io.mycat.calcite.rewriter.SQLRBORewriter; | |
import io.mycat.calcite.rules.MycatTablePhyViewRule; | |
import io.mycat.calcite.rules.MycatViewToIndexViewRule; | |
import io.mycat.calcite.spm.Plan; | |
import io.mycat.calcite.spm.PlanCache; | |
import io.mycat.calcite.spm.PlanImpl; | |
import io.mycat.calcite.table.*; | |
import io.mycat.datasource.jdbc.datasource.JdbcConnectionManager; | |
import io.mycat.gsi.GSIService; | |
import io.mycat.hbt.HBTQueryConvertor; | |
import io.mycat.hbt.SchemaConvertor; | |
import io.mycat.hbt.ast.base.Schema; | |
import io.mycat.hbt.parser.HBTParser; | |
import io.mycat.hbt.parser.ParseNode; | |
import io.mycat.router.CustomRuleFunction; | |
import io.mycat.router.ShardingTableHandler; | |
import io.mycat.util.VertxUtil; | |
import io.vertx.core.Future; | |
import io.vertx.core.impl.future.PromiseInternal; | |
import lombok.SneakyThrows; | |
import org.apache.calcite.adapter.enumerable.EnumerableInterpretable; | |
import org.apache.calcite.adapter.enumerable.EnumerableRel; | |
import org.apache.calcite.avatica.SqlType; | |
import org.apache.calcite.config.CalciteSystemProperty; | |
import org.apache.calcite.jdbc.CalciteSchema; | |
import org.apache.calcite.linq4j.Enumerable; | |
import org.apache.calcite.linq4j.tree.ClassDeclaration; | |
import org.apache.calcite.linq4j.tree.Expressions; | |
import org.apache.calcite.plan.*; | |
import org.apache.calcite.plan.hep.HepMatchOrder; | |
import org.apache.calcite.plan.hep.HepPlanner; | |
import org.apache.calcite.plan.hep.HepProgram; | |
import org.apache.calcite.plan.hep.HepProgramBuilder; | |
import org.apache.calcite.plan.volcano.VolcanoPlanner; | |
import org.apache.calcite.prepare.CalciteCatalogReader; | |
import org.apache.calcite.rel.*; | |
import org.apache.calcite.rel.core.Filter; | |
import org.apache.calcite.rel.core.TableModify; | |
import org.apache.calcite.rel.core.TableScan; | |
import org.apache.calcite.rel.externalize.RelWriterImpl; | |
import org.apache.calcite.rel.logical.LogicalProject; | |
import org.apache.calcite.rel.logical.LogicalTableModify; | |
import org.apache.calcite.rel.logical.LogicalTableScan; | |
import org.apache.calcite.rel.rules.CoreRules; | |
import org.apache.calcite.rel.rules.JoinToMultiJoinRule; | |
import org.apache.calcite.rel.type.RelDataType; | |
import org.apache.calcite.rex.RexNode; | |
import org.apache.calcite.runtime.ArrayBindable; | |
import org.apache.calcite.runtime.CalciteException; | |
import org.apache.calcite.runtime.NewMycatDataContext; | |
import org.apache.calcite.runtime.Utilities; | |
import org.apache.calcite.schema.SchemaPlus; | |
import org.apache.calcite.schema.Table; | |
import org.apache.calcite.sql.*; | |
import org.apache.calcite.sql.fun.SqlStdOperatorTable; | |
import org.apache.calcite.sql.parser.SqlParserPos; | |
import org.apache.calcite.sql.type.SqlTypeName; | |
import org.apache.calcite.sql.type.SqlTypeUtil; | |
import org.apache.calcite.sql.util.SqlOperatorTables; | |
import org.apache.calcite.sql.validate.SelectScope; | |
import org.apache.calcite.sql.validate.SqlValidator; | |
import org.apache.calcite.sql.validate.SqlValidatorImpl; | |
import org.apache.calcite.sql.validate.SqlValidatorScope; | |
import org.apache.calcite.sql2rel.RelDecorrelator; | |
import org.apache.calcite.sql2rel.SqlToRelConverter; | |
import org.apache.calcite.tools.RelBuilder; | |
import org.apache.calcite.util.Pair; | |
import org.codehaus.commons.compiler.CompilerFactoryFactory; | |
import org.codehaus.commons.compiler.IClassBodyEvaluator; | |
import org.codehaus.commons.compiler.ICompilerFactory; | |
import org.jetbrains.annotations.NotNull; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import javax.annotation.Nonnull; | |
import java.io.PrintWriter; | |
import java.io.StringReader; | |
import java.io.StringWriter; | |
import java.sql.JDBCType; | |
import java.util.*; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.stream.Collectors; | |
import static org.apache.calcite.rel.rules.CoreRules.JOIN_PUSH_EXPRESSIONS; | |
import static org.apache.calcite.rel.rules.CoreRules.JOIN_PUSH_TRANSITIVE_PREDICATES; | |
public class DrdsRunner { | |
final static Logger log = LoggerFactory.getLogger(DrdsRunner.class); | |
private final SchemaPlus schemas; | |
DrdsConst config; | |
PlanCache planCache; | |
public DrdsRunner(DrdsConst config, PlanCache planCache) { | |
this.config = config; | |
this.planCache = planCache; | |
this.schemas = convertRoSchemaPlus(config); | |
} | |
@SneakyThrows | |
public MycatRel doHbt(String hbtText) { | |
log.debug("reveice hbt"); | |
log.debug(hbtText); | |
HBTParser hbtParser = new HBTParser(hbtText); | |
ParseNode statement = hbtParser.statement(); | |
SchemaConvertor schemaConvertor = new SchemaConvertor(); | |
Schema originSchema = schemaConvertor.transforSchema(statement); | |
SchemaPlus plus = this.schemas; | |
CalciteCatalogReader catalogReader = new CalciteCatalogReader(CalciteSchema | |
.from(plus), | |
ImmutableList.of(), | |
MycatCalciteSupport.TypeFactory, | |
MycatCalciteSupport.INSTANCE.getCalciteConnectionConfig()); | |
RelOptCluster cluster = newCluster(); | |
RelBuilder relBuilder = MycatCalciteSupport.relBuilderFactory.create(cluster, catalogReader); | |
HBTQueryConvertor hbtQueryConvertor = new HBTQueryConvertor(Collections.emptyList(), relBuilder); | |
RelNode relNode = hbtQueryConvertor.complie(originSchema); | |
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); | |
hepProgramBuilder.addRuleInstance(CoreRules.AGGREGATE_REDUCE_FUNCTIONS); | |
HepProgram hepProgram = hepProgramBuilder.build(); | |
HepPlanner hepPlanner = new HepPlanner(hepProgram); | |
hepPlanner.setRoot(relNode); | |
RelNode bestExp = hepPlanner.findBestExp(); | |
bestExp = bestExp.accept(new RelShuttleImpl() { | |
@Override | |
public RelNode visit(TableScan scan) { | |
AbstractMycatTable table = scan.getTable().unwrap(AbstractMycatTable.class); | |
if (table != null) { | |
if (table instanceof MycatPhysicalTable) { | |
DataNode dataNode = ((MycatPhysicalTable) table).getDataNode(); | |
MycatPhysicalTable mycatPhysicalTable = (MycatPhysicalTable) table; | |
return new MycatTransientSQLTableScan(cluster, | |
mycatPhysicalTable.getRowType(), | |
dataNode.getTargetName(), | |
MycatCalciteSupport.INSTANCE.convertToSql( | |
scan, | |
MycatCalciteSupport.INSTANCE.getSqlDialectByTargetName(dataNode.getTargetName()), | |
false | |
).getSql()); | |
} | |
} | |
return super.visit(scan); | |
} | |
}); | |
return optimizeWithCBO(bestExp, Collections.emptyList()); | |
} | |
public DrdsSql preParse(String sqlStatement) { | |
return preParse(SQLUtils.parseSingleMysqlStatement(sqlStatement)); | |
} | |
public DrdsSql preParse(SQLStatement sqlStatement) { | |
List<Object> params = new ArrayList<>(); | |
StringBuilder sb = new StringBuilder(); | |
MycatPreparedStatementUtil.outputToParameterized(sqlStatement, sb, Collections.emptyList(), params); | |
String string = sb.toString(); | |
sqlStatement = SQLUtils.parseSingleMysqlStatement(string); | |
return DrdsSql.of(sqlStatement, string, params); | |
} | |
@SneakyThrows | |
public DrdsSql convertToMycatRel(DrdsSql stmt, MycatDataContext dataContext, OptimizationContext optimizationContext) { | |
SchemaPlus plus = this.schemas; | |
return convertToMycatRel(planCache, stmt, plus, dataContext, optimizationContext); | |
} | |
public SchemaPlus convertRoSchemaPlus(DrdsConst config) { | |
SchemaPlus plus = CalciteSchema.createRootSchema(false).plus(); | |
List<MycatSchema> schemas = new ArrayList<>(); | |
for (Map.Entry<String, SchemaHandler> entry : config.schemas().entrySet()) { | |
String schemaName = entry.getKey(); | |
SchemaHandler schemaHandler = entry.getValue(); | |
Map<String, Table> logicTableMap = new HashMap<>(); | |
for (TableHandler tableHandler : schemaHandler.logicTables().values()) { | |
MycatLogicTable logicTable = new MycatLogicTable(tableHandler); | |
logicTableMap.put(logicTable.getTable().getTableName(), logicTable); | |
} | |
MycatSchema schema = MycatSchema.create(config, schemaName, logicTableMap); | |
plus.add(schemaName, schema); | |
schemas.add(schema); | |
} | |
return plus; | |
} | |
private RowBaseIterator doExplain(MycatRel relNode1) { | |
final StringWriter sw = new StringWriter(); | |
final RelWriter planWriter = new RelWriterImpl(new PrintWriter(sw), SqlExplainLevel.ALL_ATTRIBUTES, false); | |
relNode1.explain(planWriter); | |
String[] split = sw.getBuffer().toString().split("\n"); | |
ResultSetBuilder builder = ResultSetBuilder.create(); | |
builder.addColumnInfo("explain", JDBCType.VARCHAR); | |
for (String s : split) { | |
builder.addObjectRowPayload(Arrays.asList(s)); | |
} | |
return builder.build(); | |
} | |
public DrdsSql convertToMycatRel(PlanCache planCache, | |
DrdsSql drdsSql, | |
SchemaPlus plus, | |
MycatDataContext dataContext, | |
OptimizationContext optimizationContext) { | |
MycatRel rel; | |
Plan minCostPlan = planCache.getMinCostPlan(drdsSql.getParameterizedString(), drdsSql.getTypes()); | |
if (minCostPlan != null) { | |
switch (minCostPlan.getType()) { | |
case PHYSICAL: | |
drdsSql.setRelNode((MycatRel) minCostPlan.getPhysical()); | |
break; | |
case UPDATE: | |
case INSERT: | |
drdsSql.setRelNode(minCostPlan.getPhysical()); | |
} | |
} else { | |
rel = dispatch(optimizationContext, drdsSql, plus, dataContext); | |
drdsSql.setRelNode(rel); | |
} | |
Objects.requireNonNull(drdsSql.getRelNode()); | |
return drdsSql; | |
} | |
public MycatRel dispatch(OptimizationContext optimizationContext, | |
DrdsSql drdsSql, | |
SchemaPlus plus, MycatDataContext dataContext) { | |
SQLStatement sqlStatement = drdsSql.getSqlStatement(); | |
if (sqlStatement instanceof SQLSelectStatement) { | |
return compileQuery(dataContext.getDefaultSchema(), optimizationContext, plus, drdsSql); | |
} | |
MetadataManager metadataManager = MetaClusterCurrent.wrapper(MetadataManager.class); | |
metadataManager.resolveMetadata(sqlStatement); | |
String defaultSchema = dataContext.getDefaultSchema(); | |
if (sqlStatement instanceof MySqlInsertStatement) { | |
MySqlInsertStatement insertStatement = (MySqlInsertStatement) sqlStatement; | |
String schema = SQLUtils.normalize(Optional.ofNullable(insertStatement.getTableSource()).map(i -> i.getSchema()).orElse(defaultSchema)); | |
String tableName = SQLUtils.normalize(insertStatement.getTableName().getSimpleName()); | |
TableHandler logicTable = metadataManager.getTable(schema, tableName); | |
switch (logicTable.getType()) { | |
case SHARDING: | |
return compileInsert((ShardingTable) logicTable, dataContext, drdsSql, optimizationContext); | |
case GLOBAL: | |
return complieGlobalUpdate(optimizationContext, drdsSql, sqlStatement, (GlobalTable) logicTable); | |
case NORMAL: | |
return complieNormalUpdate(optimizationContext, drdsSql, sqlStatement, (NormalTable) logicTable); | |
case CUSTOM: | |
throw new UnsupportedOperationException(); | |
} | |
} else if (sqlStatement instanceof MySqlUpdateStatement) { | |
SQLExprTableSource tableSource = (SQLExprTableSource) ((MySqlUpdateStatement) sqlStatement).getTableSource(); | |
String schema = SQLUtils.normalize(Optional.ofNullable(tableSource).map(i -> i.getSchema()).orElse(defaultSchema)); | |
String tableName = SQLUtils.normalize(((MySqlUpdateStatement) sqlStatement).getTableName().getSimpleName()); | |
TableHandler logicTable = metadataManager.getTable(schema, tableName); | |
switch (logicTable.getType()) { | |
case SHARDING: | |
return compileUpdate(logicTable, dataContext, optimizationContext, drdsSql, plus); | |
case GLOBAL: { | |
return complieGlobalUpdate(optimizationContext, drdsSql, sqlStatement, (GlobalTable) logicTable); | |
} | |
case NORMAL: { | |
return complieNormalUpdate(optimizationContext, drdsSql, sqlStatement, (NormalTable) logicTable); | |
} | |
case CUSTOM: | |
throw new UnsupportedOperationException(); | |
} | |
} else if (sqlStatement instanceof MySqlDeleteStatement) { | |
SQLExprTableSource tableSource = (SQLExprTableSource) ((MySqlDeleteStatement) sqlStatement).getTableSource(); | |
String schema = SQLUtils.normalize(Optional.ofNullable(tableSource).map(i -> i.getSchema()).orElse(defaultSchema)); | |
String tableName = SQLUtils.normalize(((MySqlDeleteStatement) sqlStatement).getTableName().getSimpleName()); | |
TableHandler logicTable = metadataManager.getTable(schema, tableName); | |
switch (logicTable.getType()) { | |
case SHARDING: | |
return compileDelete(logicTable, dataContext, optimizationContext, drdsSql, plus); | |
case GLOBAL: { | |
return complieGlobalUpdate(optimizationContext, drdsSql, sqlStatement, (GlobalTable) logicTable); | |
} | |
case NORMAL: { | |
return complieNormalUpdate(optimizationContext, drdsSql, sqlStatement, (NormalTable) logicTable); | |
} | |
case CUSTOM: | |
throw new UnsupportedOperationException(); | |
} | |
} | |
return null; | |
} | |
@NotNull | |
private MycatRel complieGlobalUpdate(OptimizationContext optimizationContext, DrdsSql drdsSql, SQLStatement sqlStatement, GlobalTable logicTable) { | |
Distribution distribution = Distribution.of(logicTable); | |
MycatUpdateRel mycatUpdateRel = new MycatUpdateRel(distribution, sqlStatement, true); | |
optimizationContext.saveAlways(); | |
return mycatUpdateRel; | |
} | |
@NotNull | |
private MycatRel complieNormalUpdate(OptimizationContext optimizationContext, DrdsSql drdsSql, SQLStatement sqlStatement, NormalTable logicTable) { | |
Distribution distribution = Distribution.of(logicTable); | |
MycatUpdateRel mycatUpdateRel = new MycatUpdateRel(distribution, sqlStatement, false); | |
optimizationContext.saveAlways(); | |
return mycatUpdateRel; | |
} | |
private MycatRel compileDelete(TableHandler logicTable, MycatDataContext dataContext, OptimizationContext optimizationContext, DrdsSql drdsSql, SchemaPlus plus) { | |
return compileQuery(dataContext.getDefaultSchema(), optimizationContext, plus, drdsSql); | |
} | |
private MycatRel compileUpdate(TableHandler logicTable, MycatDataContext dataContext, OptimizationContext optimizationContext, DrdsSql drdsSql, SchemaPlus plus) { | |
return compileQuery(dataContext.getDefaultSchema(), optimizationContext, plus, drdsSql); | |
} | |
private MycatRel compileInsert(ShardingTableHandler logicTable, | |
MycatDataContext dataContext, | |
DrdsSql drdsSql, | |
OptimizationContext optimizationContext) { | |
MySqlInsertStatement mySqlInsertStatement = drdsSql.getSqlStatement(); | |
List<SQLIdentifierExpr> columnsTmp = (List) mySqlInsertStatement.getColumns(); | |
boolean autoIncrement = logicTable.isAutoIncrement(); | |
int autoIncrementIndexTmp = -1; | |
ArrayList<Integer> shardingKeys = new ArrayList<>(); | |
CustomRuleFunction function = logicTable.function(); | |
List<SimpleColumnInfo> metaColumns; | |
if (columnsTmp.isEmpty()) {//fill columns | |
int index = 0; | |
for (SimpleColumnInfo column : metaColumns = logicTable.getColumns()) { | |
if (autoIncrement && logicTable.getAutoIncrementColumn() == column) { | |
autoIncrementIndexTmp = index; | |
} | |
if (function.isShardingKey(column.getColumnName())) { | |
shardingKeys.add(index); | |
} | |
mySqlInsertStatement.addColumn(new SQLIdentifierExpr(column.getColumnName())); | |
index++; | |
} | |
} else { | |
int index = 0; | |
metaColumns = new ArrayList<>(); | |
for (SQLIdentifierExpr column : columnsTmp) { | |
SimpleColumnInfo simpleColumnInfo = logicTable.getColumnByName(SQLUtils.normalize(column.getName())); | |
metaColumns.add(simpleColumnInfo); | |
if (autoIncrement && logicTable.getAutoIncrementColumn() == simpleColumnInfo) { | |
autoIncrementIndexTmp = index; | |
} | |
if (function.isShardingKey(simpleColumnInfo.getColumnName())) { | |
shardingKeys.add(index); | |
} | |
index++; | |
} | |
if (autoIncrement && autoIncrementIndexTmp == -1) { | |
SimpleColumnInfo autoIncrementColumn = logicTable.getAutoIncrementColumn(); | |
if (function.isShardingKey(autoIncrementColumn.getColumnName())) { | |
shardingKeys.add(index); | |
} | |
metaColumns.add(autoIncrementColumn); | |
mySqlInsertStatement.addColumn(new SQLIdentifierExpr(autoIncrementColumn.getColumnName())); | |
class CountIndex extends MySqlASTVisitorAdapter { | |
int currentIndex = -1; | |
@Override | |
public void endVisit(SQLVariantRefExpr x) { | |
currentIndex = Math.max(x.getIndex(), currentIndex); | |
super.endVisit(x); | |
} | |
} | |
CountIndex countIndex = new CountIndex(); | |
mySqlInsertStatement.accept(countIndex); | |
} | |
} | |
final int finalAutoIncrementIndex = autoIncrementIndexTmp; | |
MycatInsertRel mycatInsertRel = MycatInsertRel.create(finalAutoIncrementIndex, shardingKeys, mySqlInsertStatement, logicTable); | |
optimizationContext.saveParameterized(); | |
return mycatInsertRel; | |
} | |
private MycatRel compileQuery(String defaultSchemaName, | |
OptimizationContext optimizationContext, | |
SchemaPlus plus, | |
DrdsSql drdsSql) { | |
RelNode logPlan; | |
if (drdsSql.getRelNode() != null) { | |
if (drdsSql.getRelNode() instanceof MycatRel) { | |
return (MycatRel) drdsSql.getRelNode(); | |
} | |
logPlan = drdsSql.getRelNode(); | |
} else { | |
logPlan = getRelRoot(defaultSchemaName, plus, drdsSql); | |
} | |
if (logPlan instanceof TableModify) { | |
LogicalTableModify tableModify = (LogicalTableModify) logPlan; | |
switch (tableModify.getOperation()) { | |
case DELETE: | |
case UPDATE: | |
return planUpdate(tableModify, drdsSql, optimizationContext); | |
default: | |
throw new UnsupportedOperationException("unsupported DML operation " + tableModify.getOperation()); | |
} | |
} | |
RelNode rboLogPlan = optimizeWithRBO(logPlan, drdsSql, optimizationContext); | |
Collection<RelOptRule> rboInCbo; | |
if (MetaClusterCurrent.exist(GSIService.class)) { | |
rboInCbo = Collections.singletonList( | |
new MycatViewToIndexViewRule(optimizationContext, drdsSql.getParams()) | |
); | |
} else { | |
rboInCbo = Collections.emptyList(); | |
} | |
MycatRel mycatRel = optimizeWithCBO(rboLogPlan, rboInCbo); | |
mycatRel = (MycatRel) mycatRel.accept(new MatierialRewriter()); | |
return mycatRel; | |
} | |
private RelNode getRelRoot(String defaultSchemaName, | |
SchemaPlus plus, DrdsSql drdsSql) { | |
SQLStatement sqlStatement = drdsSql.getSqlStatement(); | |
List<Object> params = drdsSql.getParams(); | |
MycatCalciteMySqlNodeVisitor mycatCalciteMySqlNodeVisitor = new MycatCalciteMySqlNodeVisitor(); | |
sqlStatement.accept(mycatCalciteMySqlNodeVisitor); | |
SqlNode sqlNode = mycatCalciteMySqlNodeVisitor.getSqlNode(); | |
CalciteCatalogReader catalogReader = new CalciteCatalogReader(CalciteSchema | |
.from(plus), | |
defaultSchemaName != null ? ImmutableList.of(defaultSchemaName) : ImmutableList.of(), | |
MycatCalciteSupport.TypeFactory, | |
MycatCalciteSupport.INSTANCE.getCalciteConnectionConfig()); | |
SqlValidator validator = | |
new SqlValidatorImpl(SqlOperatorTables.chain(catalogReader, MycatCalciteSupport.config.getOperatorTable()), catalogReader, MycatCalciteSupport.TypeFactory, | |
MycatCalciteSupport.INSTANCE.getValidatorConfig()) { | |
@Override | |
protected void inferUnknownTypes(@Nonnull RelDataType inferredType, @Nonnull SqlValidatorScope scope, @Nonnull SqlNode node) { | |
if (node != null && node instanceof SqlDynamicParam) { | |
RelDataType relDataType = deriveType(scope, node); | |
return; | |
} | |
super.inferUnknownTypes(inferredType, scope, node); | |
} | |
@Override | |
public RelDataType getUnknownType() { | |
return super.getUnknownType(); | |
} | |
@Override | |
public RelDataType deriveType(SqlValidatorScope scope, SqlNode expr) { | |
RelDataType res = resolveDynamicParam(expr); | |
if (res == null) { | |
return super.deriveType(scope, expr); | |
} else { | |
return res; | |
} | |
} | |
@Override | |
public void validateLiteral(SqlLiteral literal) { | |
if (literal.getTypeName() == SqlTypeName.DECIMAL) { | |
return; | |
} | |
super.validateLiteral(literal); | |
} | |
private RelDataType resolveDynamicParam(SqlNode expr) { | |
if (expr != null && expr instanceof SqlDynamicParam) { | |
int index = ((SqlDynamicParam) expr).getIndex(); | |
if (index < params.size()) { | |
Object o = params.get(index); | |
if (o == null) { | |
return super.typeFactory.createUnknownType(); | |
} else { | |
SqlTypeName type = null; | |
if (o instanceof String) { | |
type = SqlTypeName.VARCHAR; | |
} else if (o instanceof Number) { | |
type = SqlTypeName.DECIMAL; | |
} else { | |
Class<?> aClass = o.getClass(); | |
for (SqlType value : SqlType.values()) { | |
if (value.clazz == aClass) { | |
type = SqlTypeName.getNameForJdbcType(value.id); | |
} | |
} | |
} | |
Objects.requireNonNull(type, () -> "unknown type:" + o.getClass()); | |
return super.typeFactory.createSqlType(type); | |
} | |
} | |
} | |
return null; | |
} | |
@Override | |
public RelDataType getValidatedNodeType(SqlNode node) { | |
RelDataType relDataType = resolveDynamicParam(node); | |
if (relDataType == null) { | |
return super.getValidatedNodeType(node); | |
} else { | |
return relDataType; | |
} | |
} | |
@Override | |
public CalciteException handleUnresolvedFunction(SqlCall call, SqlFunction unresolvedFunction, List<RelDataType> argTypes, List<String> argNames) { | |
return super.handleUnresolvedFunction(call, unresolvedFunction, argTypes, argNames); | |
} | |
@Override | |
protected void addToSelectList(List<SqlNode> list, Set<String> aliases, List<Map.Entry<String, RelDataType>> fieldList, SqlNode exp, SelectScope scope, boolean includeSystemVars) { | |
super.addToSelectList(list, aliases, fieldList, exp, scope, includeSystemVars); | |
} | |
@Override | |
protected void validateWhereOrOn(SqlValidatorScope scope, SqlNode condition, String clause) { | |
if (!condition.getKind().belongsTo(SqlKind.COMPARISON)) { | |
condition = SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, | |
condition, SqlTypeUtil.convertTypeToSpec(typeFactory.createSqlType(SqlTypeName.BOOLEAN))); | |
} | |
super.validateWhereOrOn(scope, condition, clause); | |
} | |
}; | |
SqlNode validated; | |
validated = validator.validate(sqlNode); | |
RelOptCluster cluster = newCluster(); | |
RelBuilder relBuilder = MycatCalciteSupport.relBuilderFactory.create(cluster, catalogReader); | |
SqlToRelConverter sqlToRelConverter = new SqlToRelConverter( | |
NOOP_EXPANDER, | |
validator, | |
catalogReader, | |
cluster, | |
MycatCalciteSupport.config.getConvertletTable(), | |
MycatCalciteSupport.sqlToRelConverterConfig); | |
RelRoot root = sqlToRelConverter.convertQuery(validated, false, true); | |
drdsSql.setAliasList( | |
root.fields.stream() | |
.map(Pair::getValue).collect(Collectors.toList())); | |
final RelRoot root2 = | |
root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); | |
return root2.withRel( | |
RelDecorrelator.decorrelateQuery(root.rel, relBuilder)).project(); | |
} | |
private MycatRel planUpdate(LogicalTableModify tableModify, | |
DrdsSql drdsSql, OptimizationContext optimizationContext) { | |
RelNode input = tableModify.getInput(); | |
if (input instanceof LogicalProject) { | |
input = ((LogicalProject) input).getInput(); | |
} | |
if (input instanceof Filter && ((Filter) input).getInput() instanceof LogicalTableScan) { | |
AbstractMycatTable mycatTable = tableModify.getTable().unwrap(AbstractMycatTable.class); | |
RexNode condition = ((Filter) input).getCondition(); | |
Distribution distribution = mycatTable.createDistribution(); | |
MycatUpdateRel mycatUpdateRel = MycatUpdateRel.create(tableModify.getCluster(), | |
distribution, | |
Collections.singletonList(condition), | |
drdsSql.getSqlStatement()); | |
optimizationContext.saveParameterized(); | |
return mycatUpdateRel; | |
} | |
AbstractMycatTable mycatTable = tableModify.getTable().unwrap(AbstractMycatTable.class); | |
Distribution distribution = mycatTable.createDistribution(); | |
MycatUpdateRel mycatUpdateRel = new MycatUpdateRel( | |
distribution, | |
drdsSql.getSqlStatement(), mycatTable.isBroadCast()); | |
optimizationContext.saveAlways(); | |
return mycatUpdateRel; | |
} | |
public Plan convertToExecuter(DrdsSql drdsSql, MycatDataContext dataContext, OptimizationContext optimizationContext) { | |
Plan minCostPlan = planCache.getMinCostPlan(drdsSql.getParameterizedString(), drdsSql.getTypes()); | |
if (minCostPlan != null) { | |
switch (minCostPlan.getType()) { | |
case PHYSICAL: | |
return minCostPlan; | |
} | |
} | |
drdsSql = convertToMycatRel(drdsSql, dataContext, optimizationContext); | |
MycatRel mycatRel = (MycatRel) drdsSql.getRelNode(); | |
if (mycatRel instanceof MycatUpdateRel) { | |
return new PlanImpl((MycatUpdateRel) mycatRel); | |
} else if (mycatRel instanceof MycatInsertRel) { | |
return new PlanImpl((MycatInsertRel) mycatRel); | |
} | |
CodeExecuterContext codeExecuterContext = getCodeExecuterContext(Objects.requireNonNull(mycatRel), drdsSql.isForUpdate()); | |
return new PlanImpl(mycatRel, codeExecuterContext, drdsSql.isForUpdate()); | |
} | |
@NotNull | |
public static CodeExecuterContext getCodeExecuterContext(MycatRel relNode, boolean forUpdate) { | |
int fieldCount = relNode.getRowType().getFieldCount(); | |
HashMap<String, Object> context = new HashMap<>(2); | |
StreamMycatEnumerableRelImplementor mycatEnumerableRelImplementor = new StreamMycatEnumerableRelImplementor(context); | |
relNode.accept(new RelShuttleImpl() { | |
@Override | |
public RelNode visit(RelNode other) { | |
if (other instanceof MycatView) { | |
mycatEnumerableRelImplementor.collectLeafRelNode(other); | |
} else if (other instanceof MycatTransientSQLTableScan) { | |
mycatEnumerableRelImplementor.collectLeafRelNode(other); | |
} | |
return super.visit(other); | |
} | |
}); | |
ClassDeclaration classDeclaration = mycatEnumerableRelImplementor.implementHybridRoot(relNode, EnumerableRel.Prefer.ARRAY); | |
String code = Expressions.toString(classDeclaration.memberDeclarations, "\n", false); | |
if (log.isDebugEnabled()) { | |
log.debug("----------------------------------------code----------------------------------------"); | |
log.debug(code); | |
} | |
return CodeExecuterContext.of(mycatEnumerableRelImplementor.getLeafRelNodes(), context, | |
asObjectArray(getBindable(classDeclaration, code, fieldCount)), | |
code, forUpdate); | |
} | |
@NotNull | |
private static ArrayBindable asObjectArray(ArrayBindable bindable) { | |
if (bindable.getElementType().isArray()) { | |
return bindable; | |
} | |
return new ArrayBindable() { | |
@Override | |
public Class<Object[]> getElementType() { | |
return Object[].class; | |
} | |
@Override | |
public Enumerable<Object[]> bind(NewMycatDataContext dataContext) { | |
Enumerable enumerable = bindable.bind(dataContext); | |
return enumerable.select(e -> { | |
return new Object[]{e}; | |
}); | |
} | |
}; | |
} | |
@SneakyThrows | |
static ArrayBindable getBindable(ClassDeclaration expr, String s, int fieldCount) { | |
ICompilerFactory compilerFactory; | |
try { | |
compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory(); | |
} catch (Exception e) { | |
throw new IllegalStateException( | |
"Unable to instantiate java compiler", e); | |
} | |
final IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator(); | |
cbe.setClassName(expr.name); | |
cbe.setExtendedClass(Utilities.class); | |
cbe.setImplementedInterfaces(new Class[]{ArrayBindable.class}); | |
cbe.setParentClassLoader(EnumerableInterpretable.class.getClassLoader()); | |
if (CalciteSystemProperty.DEBUG.value()) { | |
// Add line numbers to the generated janino class | |
cbe.setDebuggingInformation(true, true, true); | |
} | |
return (ArrayBindable) cbe.createInstance(new StringReader(s)); | |
} | |
public MycatRel optimizeWithCBO(RelNode logPlan, Collection<RelOptRule> relOptRules) { | |
if (logPlan instanceof MycatRel) { | |
return (MycatRel) logPlan; | |
} else { | |
RelOptCluster cluster = logPlan.getCluster(); | |
RelOptPlanner planner = cluster.getPlanner(); | |
planner.clear(); | |
MycatConvention.INSTANCE.register(planner); | |
RelOptUtil.registerDefaultRules(planner,false,false); | |
planner.addRule(CoreRules.PROJECT_TO_CALC); | |
planner.addRule(CoreRules.FILTER_TO_CALC); | |
planner.addRule(CoreRules.CALC_MERGE); | |
if (relOptRules != null) { | |
for (RelOptRule relOptRule : relOptRules) { | |
planner.addRule(relOptRule); | |
} | |
} | |
logPlan = planner.changeTraits(logPlan, cluster.traitSetOf(MycatConvention.INSTANCE)); | |
planner.setRoot(logPlan); | |
return (MycatRel) planner.findBestExp(); | |
} | |
} | |
static final ImmutableSet<RelOptRule> FILTER = ImmutableSet.of( | |
JOIN_PUSH_TRANSITIVE_PREDICATES, | |
CoreRules.JOIN_SUB_QUERY_TO_CORRELATE, | |
CoreRules.FILTER_SUB_QUERY_TO_CORRELATE, | |
CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE, | |
CoreRules.FILTER_INTO_JOIN, | |
// CoreRules.FILTER_INTO_JOIN_DUMB, | |
CoreRules.JOIN_CONDITION_PUSH, | |
CoreRules.SORT_JOIN_TRANSPOSE, | |
CoreRules.FILTER_CORRELATE, | |
CoreRules.PROJECT_CORRELATE_TRANSPOSE, | |
CoreRules.FILTER_AGGREGATE_TRANSPOSE, | |
CoreRules.FILTER_MULTI_JOIN_MERGE, | |
CoreRules.FILTER_PROJECT_TRANSPOSE, | |
CoreRules.FILTER_SET_OP_TRANSPOSE, | |
CoreRules.FILTER_PROJECT_TRANSPOSE, | |
CoreRules.SEMI_JOIN_FILTER_TRANSPOSE, | |
CoreRules.PROJECT_FILTER_TRANSPOSE, | |
CoreRules.FILTER_REDUCE_EXPRESSIONS, | |
CoreRules.JOIN_REDUCE_EXPRESSIONS, | |
CoreRules.PROJECT_REDUCE_EXPRESSIONS, | |
CoreRules.FILTER_MERGE, | |
CoreRules.JOIN_PUSH_EXPRESSIONS, | |
CoreRules.JOIN_PUSH_TRANSITIVE_PREDICATES | |
// CoreRules.PROJECT_CALC_MERGE, | |
// CoreRules.FILTER_CALC_MERGE, | |
// CoreRules.FILTER_TO_CALC, | |
// CoreRules.PROJECT_TO_CALC, | |
// CoreRules.CALC_REMOVE, | |
// CoreRules.CALC_MERGE | |
); | |
private static RelNode optimizeWithRBO(RelNode logPlan, DrdsSql drdsSql, OptimizationContext optimizationContext) { | |
// builder.addRuleCollection(ImmutableList.of( | |
// new MycatFilterViewRule(optimizationContext), | |
// MycatProjectViewRule.INSTANCE, | |
// MycatJoinViewRule.INSTANCE, | |
// MycatAggregateViewRule.INSTANCE, | |
// MycatSortViewRule.INSTANCE, | |
// CoreRules.AGGREGATE_REDUCE_FUNCTIONS | |
// )); | |
// builder.addRuleInstance(CoreRules.PROJECT_TO_CALC); | |
// builder.addRuleInstance(CoreRules.FILTER_TO_CALC); | |
// builder.addRuleInstance(CoreRules.CALC_MERGE); | |
// builder.addRuleInstance(CoreRules.FILTER_CALC_MERGE); | |
// builder.addRuleInstance(CoreRules.PROJECT_CALC_MERGE); | |
RelOptPlanner relOptPlanner = logPlan.getCluster().getPlanner(); | |
relOptPlanner.addRule(CoreRules.PROJECT_JOIN_TRANSPOSE); | |
FILTER.forEach(c->relOptPlanner.addRule(c)); | |
relOptPlanner.addRule(CoreRules.AGGREGATE_REDUCE_FUNCTIONS); | |
// logPlan.getCluster().traitSetOf(MycatConvention.INSTANCE)) | |
MycatConvention.INSTANCE.register(relOptPlanner); | |
relOptPlanner.setRoot((logPlan)); | |
relOptPlanner.addRule(new RelOptRule(RelOptRule.operand(RelNode.class,RelOptRule.any()),MycatCalciteSupport.relBuilderFactory,"333333333") { | |
@Override | |
public void onMatch(RelOptRuleCall call) { | |
call. | |
} | |
}); | |
RelNode bestExp = relOptPlanner.findBestExp(); | |
SQLRBORewriter sqlrboRewriter = new SQLRBORewriter(); | |
RelNode accept = bestExp.accept(sqlrboRewriter); | |
return accept; | |
} | |
public static RelOptCluster newCluster() { | |
RelOptPlanner planner = new VolcanoPlanner(); | |
ImmutableList<RelTraitDef> TRAITS = ImmutableList.of(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE); | |
for (RelTraitDef i : TRAITS) { | |
planner.addRelTraitDef(i); | |
} | |
return RelOptCluster.create(planner, MycatCalciteSupport.RexBuilder); | |
} | |
private static final RelOptTable.ViewExpander NOOP_EXPANDER = (rowType, queryString, schemaPath, viewPath) -> null; | |
private static class Parameterized { | |
private String parameterizedString; | |
private List<Object> parameters; | |
public String getParameterizedString() { | |
return parameterizedString; | |
} | |
public List<Object> getParameters() { | |
return parameters; | |
} | |
public static Parameterized invoke(SQLStatement mySqlInsertStatement) { | |
Parameterized parameterized = new Parameterized(); | |
StringBuilder sb = new StringBuilder(); | |
List<Object> params = new ArrayList<>(); | |
MySqlExportParameterVisitor parameterVisitor = new MySqlExportParameterVisitor(params, sb, true); | |
mySqlInsertStatement.accept(parameterVisitor); | |
parameterized.parameterizedString = sb.toString(); | |
parameterized.parameters = params; | |
return parameterized; | |
} | |
} | |
public PlanCache getPlanCache() { | |
return planCache; | |
} | |
@SneakyThrows | |
public Future<Void> runOnDrds(MycatDataContext dataContext, | |
SQLStatement statement, Response response) { | |
DrdsSql drdsSql = this.preParse(statement); | |
Plan plan = getPlan(dataContext, drdsSql); | |
XaSqlConnection transactionSession = (XaSqlConnection) dataContext.getTransactionSession(); | |
List<Object> params = drdsSql.getParams(); | |
PlanImplementor planImplementor = new ObservablePlanImplementorImpl( | |
transactionSession, | |
dataContext, params, response); | |
return impl(plan, planImplementor); | |
} | |
private Future<Void> impl(Plan plan, PlanImplementor planImplementor) { | |
switch (plan.getType()) { | |
case PHYSICAL: | |
return planImplementor.execute(plan); | |
case UPDATE: | |
return planImplementor.execute((MycatUpdateRel) Objects.requireNonNull(plan.getPhysical())); | |
case INSERT: | |
return planImplementor.execute((MycatInsertRel) Objects.requireNonNull(plan.getPhysical())); | |
default: { | |
return VertxUtil.newFailPromise(new MycatException(MySQLErrorCode.ER_NOT_SUPPORTED_YET, "不支持的执行计划")); | |
} | |
} | |
} | |
public Plan getPlan(MycatDataContext dataContext, DrdsSql drdsSql) { | |
OptimizationContext optimizationContext = new OptimizationContext(); | |
Plan plan = this.convertToExecuter(drdsSql, dataContext, optimizationContext); | |
getPlanCache().put(drdsSql.getParameterizedString(), drdsSql.getTypes(), plan); | |
return plan; | |
} | |
public Future<Void> runHbtOnDrds(MycatDataContext dataContext, String statement, Response response) { | |
XaSqlConnection transactionSession = (XaSqlConnection) dataContext.getTransactionSession(); | |
List<Object> params = Collections.emptyList(); | |
PlanImplementor planImplementor = new ObservablePlanImplementorImpl( | |
transactionSession, | |
dataContext, params, response); | |
PlanImpl hbtPlan = getHbtPlan(dataContext, statement); | |
return planImplementor.execute(hbtPlan); | |
} | |
public PlanImpl getHbtPlan(MycatDataContext dataContext, String statement) { | |
DrdsRunner drdsRunners = this; | |
MycatRel mycatRel = drdsRunners.doHbt(statement); | |
CodeExecuterContext codeExecuterContext = getCodeExecuterContext(mycatRel, false); | |
return new PlanImpl(mycatRel, codeExecuterContext, false); | |
} | |
// @NotNull | |
// public static CompletableFuture<Enumerable<Object[]>> getJdbcExecuter(Plan plan, MycatDataContext context, List<Object> params) { | |
// CodeExecuterContext codeExecuterContext = plan.getCodeExecuterContext(); | |
// JdbcConnectionUsage connectionUsage = JdbcConnectionUsage.computeJdbcTargetConnection(context, params, codeExecuterContext); | |
// CompletableFuture<IdentityHashMap<RelNode, List<Enumerable<Object[]>>>> collect = connectionUsage.collect(MetaClusterCurrent.wrapper(JdbcConnectionManager.class), params); | |
// return collect.thenCompose(map -> { | |
// JdbcMycatDataContextImpl jdbcMycatDataContext = new JdbcMycatDataContextImpl(context, codeExecuterContext, map, params, plan.forUpdate()); | |
// ArrayBindable bindable = codeExecuterContext.getBindable(); | |
// Enumerable<Object[]> bindObservable = bindable.bind(jdbcMycatDataContext); | |
// CalciteRowMetaData metaData = plan.getMetaData(); | |
// return CompletableFuture.completedFuture(bindObservable); | |
// }); | |
// } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment