/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 010 */ private UnsafeRow filter_result;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 013 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 014 */ private org.apache.spark.sql.execution.joins.UnsafeHashedRelation bhj_relation;
/* 015 */ private UnsafeRow bhj_result;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 019 */ private UnsafeRow bhj_result1;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder1;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter1;
/* 022 */
/* 023 */ public GeneratedIterator(Object[] references) {
/* 024 */ this.references = references;
/* 025 */ }
/* 026 */
/* 027 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */ partitionIndex = index; /* This shows that the match happens per partition of the larger Dataset */
/* 029 */ this.inputs = inputs;
/* 030 */ wholestagecodegen_init_0();
/* 031 */ wholestagecodegen_init_1();
/* 032 */
/* 033 */ }
/* 034 */
/* 035 */ private void wholestagecodegen_init_0() {
/* 036 */ inputadapter_input = inputs[0];
/* 037 */ this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 038 */ filter_result = new UnsafeRow(3);
/* 039 */ this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 96);
/* 040 */ this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 3);
/* 041 */ this.bhj_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[1];
/* 042 */ /* The broadcasted table been read as an instance of UnsafeHashedRelation. This happens to be our Department table, since it has a smaller estimated physical size */
/* 043 */ bhj_relation = ((org.apache.spark.sql.execution.joins.UnsafeHashedRelation) bhj_broadcast.value()).asReadOnlyCopy();
/* 044 */ incPeakExecutionMemory(bhj_relation.estimatedSize());
/* 045 */
/* 046 */ bhj_result = new UnsafeRow(1);
/* 047 */ this.bhj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(bhj_result, 32);
/* 048 */ this.bhj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(bhj_holder, 1);
/* 049 */ this.bhj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 050 */ bhj_result1 = new UnsafeRow(5);
/* 051 */ this.bhj_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(bhj_result1, 160);
/* 052 */
/* 053 */ }
/* 054 */
/* 055 */ private void wholestagecodegen_init_1() {
/* 056 */ this.bhj_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(bhj_holder1, 5);
/* 057 */
/* 058 */ }
/* 059 */
/* 060 */ protected void processNext() throws java.io.IOException {
/* 061 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 062 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 063 */ boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
/* 064 */ UTF8String inputadapter_value2 = inputadapter_isNull2 ? null : (inputadapter_row.getUTF8String(2));
/* 065 */
/* 066 */ if (!(!(inputadapter_isNull2))) continue;
/* 067 */
/* 068 */ filter_numOutputRows.add(1);
/* 069 */
/* 070 */ /* generate join key for stream side a.k.a the larger Dataset */
/* 071 */
/* 072 */ bhj_holder.reset();
/* 073 */
/* 074 */ bhj_rowWriter.write(0, inputadapter_value2);
/* 075 */ bhj_result.setTotalSize(bhj_holder.totalSize());
/* 076 */
/* 077 */ /* find matches from HashedRelation */
/* 078 */ UnsafeRow bhj_matched = bhj_result.anyNull() ? null: (UnsafeRow)bhj_relation.getValue(bhj_result);
/* 079 */ if (bhj_matched == null) continue;
/* 080 */
/* 081 */ bhj_numOutputRows.add(1);
/* 082 */
/* 083 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 084 */ UTF8String inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getUTF8String(0));
/* 085 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 086 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 087 */ UTF8String bhj_value1 = bhj_matched.getUTF8String(0);
/* 088 */ boolean bhj_isNull2 = bhj_matched.isNullAt(1);
/* 089 */ UTF8String bhj_value2 = bhj_isNull2 ? null : (bhj_matched.getUTF8String(1));
/* 090 */ bhj_holder1.reset();
/* 091 */
/* 092 */ bhj_rowWriter1.zeroOutNullBytes();
/* 093 */
/* 094 */ if (inputadapter_isNull) {
/* 095 */ bhj_rowWriter1.setNullAt(0);
/* 096 */ } else {
/* 097 */ bhj_rowWriter1.write(0, inputadapter_value);
/* 098 */ }
/* 099 */
/* 100 */ if (inputadapter_isNull1) {
/* 101 */ bhj_rowWriter1.setNullAt(1);
/* 102 */ } else {
/* 103 */ bhj_rowWriter1.write(1, inputadapter_value1);
/* 104 */ }
/* 105 */
/* 106 */ bhj_rowWriter1.write(2, inputadapter_value2);
/* 107 */
/* 108 */ bhj_rowWriter1.write(3, bhj_value1);
/* 109 */
/* 110 */ if (bhj_isNull2) {
/* 111 */ bhj_rowWriter1.setNullAt(4);
/* 112 */ } else {
/* 113 */ bhj_rowWriter1.write(4, bhj_value2);
/* 114 */ }
/* 115 */ bhj_result1.setTotalSize(bhj_holder1.totalSize());
/* 116 */ append(bhj_result1);
/* 117 */ if (shouldStop()) return;
/* 118 */ }
/* 119 */ }
/* 120 */ }
Last active
February 18, 2018 07:21
-
-
Save sujithjay/6f1d012fe221e7b888f6246896af6bff to your computer and use it in GitHub Desktop.
Spark SQL Joins: Code Snippets
scala> case class Employee(id: String, name: String, did: String)
defined class Employee
scala> case class Department(id: String, name: String)
defined class Department
scala> val joe = Employee("1", "Joe", "1")
joe: Employee = Employee(1,Joe,1)
scala> val sam = Employee("2", "Sam", "2")
sam: Employee = Employee(2,Sam,2)
scala> val it = Department("1", "IT")
it: Department = Department(1,IT)
scala> val sales = Department("2", "Sales")
sales: Department = Department(2,Sales)
scala> val employees = spark.createDataset(List(joe, sam))
employees: org.apache.spark.sql.Dataset[Employee] = [id: string, name: string ... 1 more field]
scala> val departments = spark.createDataset(List(it, sales))
departments: org.apache.spark.sql.Dataset[Department] = [id: string, name: string]
scala> val widetable = employees.join(departments, employees("did")===departments("id"))
scala> sc.setLogLevel("TRACE")
scala> widetable.show()
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment