scala> widetable.explain()
== Physical Plan ==
*SortMergeJoin [customer_id#2], [customer_id#17], Inner
:- *Sort [customer_id#2 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer_id#2, 200)
: +- *Scan JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1] [product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] ReadSchema: struct<product_id:int,time_id:int,customer_id:int,promotion_id:int,store_id:int,store_sales:decim...
+- *Sort [customer_id#17 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(customer_id#17, 200)
+- *Scan JDBCRelation(foodmart.customer) [numPartitions=1] [customer_id#17,fullname#45] ReadSchema: struct<customer_id:int,fullname:string>
Last active
June 28, 2018 08:43
-
-
Save sujithjay/1a33265eeb3598340722fca3e40fbba2 to your computer and use it in GitHub Desktop.
Spark SQL Joins: Code Snippets
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, ('s.customer_id = 'c.customer_id)
:- 'SubqueryAlias s
: +- 'UnresolvedRelation `sales_fact_1998`
+- 'SubqueryAlias c
+- 'UnresolvedRelation `customer`
== Analyzed Logical Plan ==
product_id: int, time_id: int, customer_id: int, promotion_id: int, store_id: int, store_sales: decimal(10,4), store_cost: decimal(10,4), unit_sales: decimal(10,4), customer_id: int, fullname: string
Project [product_id#0, time_id#1, customer_id#2, promotion_id#3, store_id#4, store_sales#5, store_cost#6, unit_sales#7, customer_id#17, fullname#45]
+- Join Inner, (customer_id#2 = customer_id#17)
:- SubqueryAlias s
: +- SubqueryAlias sales_fact_1998
: +- Relation[product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1]
+- SubqueryAlias c
+- SubqueryAlias customer
+- Project [customer_id#17, fullname#45]
+- Relation[customer_id#17,account_num#18L,lname#19,fname#20,mi#21,address1#22,address2#23,address3#24,address4#25,city#26,state_province#27,postal_code#28,country#29,customer_region_id#30,phone1#31,phone2#32,birthdate#33,marital_status#34,yearly_income#35,gender#36,total_children#37,num_children_at_home#38,education#39,date_accnt_opened#40,member_card#41,occupation#42,houseowner#43,num_cars_owned#44,fullname#45] JDBCRelation(foodmart.customer) [numPartitions=1]
== Optimized Logical Plan ==
Join Inner, (customer_id#2 = customer_id#17)
:- Relation[product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1]
+- Project [customer_id#17, fullname#45]
+- Relation[customer_id#17,account_num#18L,lname#19,fname#20,mi#21,address1#22,address2#23,address3#24,address4#25,city#26,state_province#27,postal_code#28,country#29,customer_region_id#30,phone1#31,phone2#32,birthdate#33,marital_status#34,yearly_income#35,gender#36,total_children#37,num_children_at_home#38,education#39,date_accnt_opened#40,member_card#41,occupation#42,houseowner#43,num_cars_owned#44,fullname#45] JDBCRelation(foodmart.customer) [numPartitions=1]
== Physical Plan ==
*SortMergeJoin [customer_id#2], [customer_id#17], Inner
:- *Sort [customer_id#2 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer_id#2, 200)
: +- *Scan JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1] [product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] ReadSchema: struct<product_id:int,time_id:int,customer_id:int,promotion_id:int,store_id:int,store_sales:decim...
+- *Sort [customer_id#17 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(customer_id#17, 200)
+- *Scan JDBCRelation(foodmart.customer) [numPartitions=1] [customer_id#17,fullname#45] ReadSchema: struct<customer_id:int,fullname:string>
scala> :paste
/* Entering paste mode (ctrl-D to finish) */
val salesFact = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/foodmart")
.option("user", "username")
.option("password", "password")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "foodmart.sales_fact_1998")
.load()
/*Exiting paste mode, now interpreting.*/
salesFact: org.apache.spark.sql.DataFrame = [product_id: int, time_id: int ... 6 more fields]
scala> val customer = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/foodmart")
.option("user", "username")
.option("password", "password")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "foodmart.customer")
.load()
.select("customer_id", "fullname")
customer: org.apache.spark.sql.DataFrame = [customer_id: int, fullname: string]
scala> salesFact.createOrReplaceTempView("sales_fact_1998")
scala> customer.createOrReplaceTempView("customer")
scala> val widetable = spark.sql("select * from sales_fact_1998 s join customer c on s.customer_id = c.customer_id")
widetable: org.apache.spark.sql.DataFrame = [product_id: int, time_id: int ... 8 more fields]
scala> widetable.explain()
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator smj_leftInput;
/* 009 */ private scala.collection.Iterator smj_rightInput;
/* 010 */ private InternalRow smj_leftRow;
/* 011 */ private InternalRow smj_rightRow;
/* 012 */ private int smj_value2;
/* 013 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 014 */ private int smj_value3;
/* 015 */ private int smj_value4;
/* 016 */ private int smj_value5;
/* 017 */ private int smj_value6;
/* 018 */ private int smj_value7;
/* 019 */ private int smj_value8;
/* 020 */ private Decimal smj_value9;
/* 021 */ private Decimal smj_value10;
/* 022 */ private Decimal smj_value11;
/* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 024 */ private UnsafeRow smj_result;
/* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
/* 027 */
/* 028 */ public GeneratedIterator(Object[] references) {
/* 029 */ this.references = references;
/* 030 */ }
/* 031 */
/* 032 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 033 */ partitionIndex = index;
/* 034 */ this.inputs = inputs;
/* 035 */ smj_leftInput = inputs[0];
/* 036 */ smj_rightInput = inputs[1];
/* 037 */
/* 038 */ smj_rightRow = null;
/* 039 */
/* 040 */ smj_matches = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647);
/* 041 */
/* 042 */ this.smj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 043 */ smj_result = new UnsafeRow(10);
/* 044 */ this.smj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 32);
/* 045 */ this.smj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 10);
/* 046 */
/* 047 */ }
/* 048 */
/* 049 */ private boolean findNextInnerJoinRows(
/* 050 */ scala.collection.Iterator leftIter,
/* 051 */ scala.collection.Iterator rightIter) {
/* 052 */ smj_leftRow = null;
/* 053 */ int comp = 0;
/* 054 */ while (smj_leftRow == null) {
/* 055 */ if (!leftIter.hasNext()) return false;
/* 056 */ smj_leftRow = (InternalRow) leftIter.next();
/* 057 */
/* 058 */ int smj_value = smj_leftRow.getInt(2);
/* 059 */ if (false) {
/* 060 */ smj_leftRow = null;
/* 061 */ continue;
/* 062 */ }
/* 063 */ if (!smj_matches.isEmpty()) {
/* 064 */ comp = 0;
/* 065 */ if (comp == 0) {
/* 066 */ comp = (smj_value > smj_value3 ? 1 : smj_value < smj_value3 ? -1 : 0);
/* 067 */ }
/* 068 */
/* 069 */ if (comp == 0) {
/* 070 */ return true;
/* 071 */ }
/* 072 */ smj_matches.clear();
/* 073 */ }
/* 074 */
/* 075 */ do {
/* 076 */ if (smj_rightRow == null) {
/* 077 */ if (!rightIter.hasNext()) {
/* 078 */ smj_value3 = smj_value;
/* 079 */ return !smj_matches.isEmpty();
/* 080 */ }
/* 081 */ smj_rightRow = (InternalRow) rightIter.next();
/* 082 */
/* 083 */ int smj_value1 = smj_rightRow.getInt(0);
/* 084 */ if (false) {
/* 085 */ smj_rightRow = null;
/* 086 */ continue;
/* 087 */ }
/* 088 */ smj_value2 = smj_value1;
/* 089 */ }
/* 090 */
/* 091 */ comp = 0;
/* 092 */ if (comp == 0) {
/* 093 */ comp = (smj_value > smj_value2 ? 1 : smj_value < smj_value2 ? -1 : 0);
/* 094 */ }
/* 095 */
/* 096 */ if (comp > 0) {
/* 097 */ smj_rightRow = null;
/* 098 */ } else if (comp < 0) {
/* 099 */ if (!smj_matches.isEmpty()) {
/* 100 */ smj_value3 = smj_value;
/* 101 */ return true;
/* 102 */ }
/* 103 */ smj_leftRow = null;
/* 104 */ } else {
/* 105 */ smj_matches.add((UnsafeRow) smj_rightRow);
/* 106 */ smj_rightRow = null;;
/* 107 */ }
/* 108 */ } while (smj_leftRow != null);
/* 109 */ }
/* 110 */ return false; // unreachable
/* 111 */ }
/* 112 */
/* 113 */ protected void processNext() throws java.io.IOException {
/* 114 */ while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 115 */ smj_value4 = smj_leftRow.getInt(0);
/* 116 */ smj_value5 = smj_leftRow.getInt(1);
/* 117 */ smj_value6 = smj_leftRow.getInt(2);
/* 118 */ smj_value7 = smj_leftRow.getInt(3);
/* 119 */ smj_value8 = smj_leftRow.getInt(4);
/* 120 */ smj_value9 = smj_leftRow.getDecimal(5, 10, 4);
/* 121 */ smj_value10 = smj_leftRow.getDecimal(6, 10, 4);
/* 122 */ smj_value11 = smj_leftRow.getDecimal(7, 10, 4);
/* 123 */ scala.collection.Iterator<UnsafeRow> smj_iterator = smj_matches.generateIterator();
/* 124 */ while (smj_iterator.hasNext()) {
/* 125 */ InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next();
/* 126 */
/* 127 */ smj_numOutputRows.add(1);
/* 128 */
/* 129 */ int smj_value12 = smj_rightRow1.getInt(0);
/* 130 */ UTF8String smj_value13 = smj_rightRow1.getUTF8String(1);
/* 131 */ smj_holder.reset();
/* 132 */
/* 133 */ smj_rowWriter.write(0, smj_value4);
/* 134 */
/* 135 */ smj_rowWriter.write(1, smj_value5);
/* 136 */
/* 137 */ smj_rowWriter.write(2, smj_value6);
/* 138 */
/* 139 */ smj_rowWriter.write(3, smj_value7);
/* 140 */
/* 141 */ smj_rowWriter.write(4, smj_value8);
/* 142 */
/* 143 */ smj_rowWriter.write(5, smj_value9, 10, 4);
/* 144 */
/* 145 */ smj_rowWriter.write(6, smj_value10, 10, 4);
/* 146 */
/* 147 */ smj_rowWriter.write(7, smj_value11, 10, 4);
/* 148 */
/* 149 */ smj_rowWriter.write(8, smj_value12);
/* 150 */
/* 151 */ smj_rowWriter.write(9, smj_value13);
/* 152 */ smj_result.setTotalSize(smj_holder.totalSize());
/* 153 */ append(smj_result.copy());
/* 154 */
/* 155 */ }
/* 156 */ if (shouldStop()) return;
/* 157 */ }
/* 158 */ }
/* 159 */ }
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean sort_needToSort;
/* 009 */ private org.apache.spark.sql.execution.SortExec sort_plan;
/* 010 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 011 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 012 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 013 */ private scala.collection.Iterator inputadapter_input;
/* 014 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
/* 015 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;
/* 017 */
/* 018 */ public GeneratedIterator(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ sort_needToSort = true;
/* 026 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
/* 027 */ sort_sorter = sort_plan.createSorter();
/* 028 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 029 */
/* 030 */ inputadapter_input = inputs[0];
/* 031 */ this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 032 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 033 */ this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ private void sort_addToSorter() throws java.io.IOException {
/* 038 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 041 */ if (shouldStop()) return;
/* 042 */ }
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ protected void processNext() throws java.io.IOException {
/* 047 */ if (sort_needToSort) {
/* 048 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 049 */ sort_addToSorter();
/* 050 */ sort_sortedIter = sort_sorter.sort();
/* 051 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 052 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 053 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 054 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 055 */ sort_needToSort = false;
/* 056 */ }
/* 057 */
/* 058 */ while (sort_sortedIter.hasNext()) {
/* 059 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 060 */
/* 061 */ append(sort_outputRow);
/* 062 */
/* 063 */ if (shouldStop()) return;
/* 064 */ }
/* 065 */ }
/* 066 */ }
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment