Last active
June 2, 2016 07:20
-
-
Save jyates/f11eb44a44af715b483859f497b9ea89 to your computer and use it in GitHub Desktop.
Drill User List - rewriting table for joining logical partitions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.fineo.read.drill.exec.store.rel; | |
import org.apache.calcite.plan.Convention; | |
import org.apache.calcite.plan.RelOptCluster; | |
import org.apache.calcite.plan.RelOptTable; | |
import org.apache.calcite.rel.RelNode; | |
import org.apache.calcite.rel.core.JoinRelType; | |
import org.apache.calcite.rel.logical.LogicalTableScan; | |
import org.apache.calcite.rel.type.RelDataType; | |
import org.apache.calcite.rel.type.RelDataTypeField; | |
import org.apache.calcite.rex.RexNode; | |
import org.apache.calcite.rex.RexUtil; | |
import org.apache.calcite.schema.TranslatableTable; | |
import org.apache.calcite.sql.fun.SqlStdOperatorTable; | |
import org.apache.calcite.tools.RelBuilder; | |
import org.apache.drill.exec.planner.logical.DynamicDrillTable; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import static com.google.common.collect.Lists.newArrayList; | |
public class MyTable extends DynamicDrillTable implements TranslatableTable { | |
private final SchemaStore schema; | |
public MyTable(MyPlugin plugin, | |
String storageEngineName, String userName, Object selection, SchemaStore store) { | |
super(plugin, storageEngineName, userName, selection); | |
this.schema = store; | |
} | |
@Override | |
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { | |
LogicalScanBuilder builder = new LogicalScanBuilder(context, relOptTable); | |
for (String sourcePath : newArrayList("/drill/table1.json", "/drill/table2.json")) { | |
builder.scan("dfs", sourcePath); | |
} | |
return builder.build(); | |
} | |
private static class LogicalScanBuilder { | |
private final RelBuilder builder; | |
private final RelOptTable relOptTable; | |
private final RelOptCluster cluster; | |
private int scanCount = 0; | |
public LogicalScanBuilder(RelOptTable.ToRelContext context, RelOptTable relOptTable) { | |
this.cluster = context.getCluster(); | |
this.relOptTable = relOptTable; | |
this.builder = RelBuilder.proto(cluster.getPlanner().getContext()) | |
.create(cluster, relOptTable.getRelOptSchema()); | |
} | |
/** | |
* Work around for {@link RelBuilder#scan(String)} not taking multiple String parts as in | |
* Calcite 1.8. Once Drill bumps up, we can replace with just using that | |
*/ | |
public LogicalScanBuilder scan(String... schemaAndTable) { | |
RelOptTable table = | |
relOptTable.getRelOptSchema().getTableForMember(newArrayList(schemaAndTable)); | |
LogicalTableScan scan = | |
new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), table); | |
builder.push(scan); | |
scanCount++; | |
return this; | |
} | |
public RelNode build() { | |
// join all the sub-tables together on the common keys | |
for (int i = 0; i < scanCount - 1; i++) { | |
// ideally do: | |
// builder.join(JoinRelType.FULL, "id") | |
// but seem to have to make our own version: | |
builder.join(JoinRelType.FULL, composeCondition("id")); | |
} | |
return builder.build(); | |
} | |
private RexNode composeCondition(String... fieldNames) { | |
RelNode table1 = builder.peek(0); | |
RelNode table2 = builder.peek(1); | |
// build the rex node for the two tables | |
final List<RexNode> conditions = new ArrayList<>(); | |
for (String fieldName : fieldNames) { | |
conditions.add( | |
builder.call(SqlStdOperatorTable.EQUALS, | |
field(table1, fieldName, 0), | |
field(table2, fieldName, 1))); | |
} | |
return RexUtil.composeConjunction(cluster.getRexBuilder(), conditions, false); | |
} | |
private RexNode field(RelNode table, String fieldName, int offset) { | |
RelDataType row = table.getRowType(); | |
RelDataTypeField field = row.getField(fieldName, true, false); | |
int index = field.getIndex(); | |
return cluster.getRexBuilder().makeInputRef(row, index + offset); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2016-06-02 00:02:00 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:Directory Prune Planning (7ms): | |
LogicalProject(*=[$0]): rowcount = 1500.0, cumulative cost = {3200.0 rows, 1702.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 16 | |
LogicalJoin(condition=[=($0, $1)], joinType=[inner]): rowcount = 1500.0, cumulative cost = {1700.0 rows, 202.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14 | |
LogicalTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3 | |
LogicalTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2 | |
2016-06-02 00:02:00 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:LOPT Join Planning (17ms): | |
DrillProjectRel(*=[$0]): rowcount = 1.0, cumulative cost = {4.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 55 | |
DrillJoinRel(condition=[=($0, $1)], joinType=[inner]): rowcount = 1.0, cumulative cost = {4.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 60 | |
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 42 | |
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 41 | |
2016-06-02 00:02:00 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[151] - Drill Physical: | |
00-00 Screen : rowType = RecordType(ANY *): rowcount = 1.0, cumulative cost = {6.1 rows, 10.1 cpu, 0.0 io, 0.0 network, 16.0 memory}, id = 624 | |
00-01 Project(*=[$0]) : rowType = RecordType(ANY *): rowcount = 1.0, cumulative cost = {6.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 16.0 memory}, id = 623 | |
00-02 MergeJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY id, ANY id0): rowcount = 1.0, cumulative cost = {6.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 16.0 memory}, id = 622 | |
00-04 SelectionVectorRemover : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 617 | |
00-06 Sort(sort0=[$0], dir0=[ASC]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 616 | |
00-08 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]]]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 615 | |
00-03 Project(id0=[$0]) : rowType = RecordType(ANY id0): rowcount = 1.0, cumulative cost = {2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 621 | |
00-05 SelectionVectorRemover : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 620 | |
00-07 Sort(sort0=[$0], dir0=[ASC]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 619 | |
00-09 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]]]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 618 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2016-06-02 00:09:29 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:Directory Prune Planning (6ms): | |
LogicalProject(*=[$0], *0=[$2]): rowcount = 1500.0, cumulative cost = {3200.0 rows, 3202.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 19 | |
LogicalJoin(condition=[=($1, $3)], joinType=[full]): rowcount = 1500.0, cumulative cost = {1700.0 rows, 202.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 17 | |
EnumerableTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 5 | |
EnumerableTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 6 | |
2016-06-02 00:09:29 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:LOPT Join Planning (14ms): | |
DrillProjectRel(*=[$0], *0=[$2]): rowcount = 1.0, cumulative cost = {4.0 rows, 20020.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 54 | |
DrillJoinRel(condition=[=($1, $3)], joinType=[full]): rowcount = 1.0, cumulative cost = {4.0 rows, 20020.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 59 | |
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 10000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 41 | |
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 10000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 40 | |
2016-06-02 00:03:50 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[151] - Drill Physical: | |
00-00 Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 1.0, cumulative cost = {2.1 rows, 20.1 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 325 | |
00-01 ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 1.0, cumulative cost = {2.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 324 | |
00-02 Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 1.0, cumulative cost = {2.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 323 | |
00-03 HashJoin(condition=[=($1, $3)], joinType=[full]) : rowType = RecordType(ANY T0¦¦*, ANY companykey, ANY T1¦¦*, ANY companykey0): rowcount = 1.0, cumulative cost = {2.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 322 | |
00-05 Project(T0¦¦*=[$0], companykey=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY companykey): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 318 | |
00-07 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-1-bbebfbe4-baf0-4dc2-ae79-fae61c0da3fc.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-1-bbebfbe4-baf0-4dc2-ae79-fae61c0da3fc.json]]]) : rowType = (DrillRecordRow[*, companykey]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 317 | |
00-04 Project(T1¦¦*=[$0], companykey0=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY companykey0): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 321 | |
00-06 Project(T1¦¦*=[$0], companykey=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY companykey): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 320 | |
00-08 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-2-7ce87e1f-9709-4417-88a9-6475e3a2517e.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-2-7ce87e1f-9709-4417-88a9-6475e3a2517e.json]]]) : rowType = (DrillRecordRow[*, companykey]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 319 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment