Last active
July 5, 2020 23:49
-
-
Save nsivabalan/30b2b553939e0e5719e3e407a80b8925 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java | |
index 315c2659..a699e0c8 100644 | |
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java | |
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java | |
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; | |
import org.apache.hudi.exception.HoodieKeyException; | |
import org.apache.avro.generic.GenericRecord; | |
+import org.apache.spark.sql.Row; | |
import java.util.Arrays; | |
import java.util.List; | |
@@ -39,15 +40,14 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator { | |
private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; | |
private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; | |
- protected final List<String> recordKeyFields; | |
- | |
public GlobalDeleteKeyGenerator(TypedProperties config) { | |
super(config); | |
- this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")); | |
+ this.setRecordKeyFields(Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","))); | |
} | |
@Override | |
public HoodieKey getKey(GenericRecord record) { | |
+ List<String> recordKeyFields = getRecordKeyFields(); | |
if (recordKeyFields == null) { | |
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); | |
} | |
@@ -73,4 +73,17 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator { | |
return new HoodieKey(recordKey.toString(), EMPTY_PARTITION); | |
} | |
+ | |
+ public boolean isRowKeyExtractionSupported() { | |
+ // key-generator implementation that inherits from this class needs to implement this method | |
+ return this.getClass().equals(GlobalDeleteKeyGenerator.class); | |
+ } | |
+ | |
+ public String getRecordKeyFromRow(Row row) { | |
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRowKeyFieldsPos()); | |
+ } | |
+ | |
+ public String getPartitionPathFromRow(Row row) { | |
+ return EMPTY_PARTITION; | |
+ } | |
} | |
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java | |
index a9b1b6ec..5edbc540 100644 | |
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java | |
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java | |
@@ -20,17 +20,25 @@ package org.apache.hudi.keygen; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
+ | |
+import org.apache.hudi.AvroConversionHelper; | |
+import org.apache.hudi.AvroConversionUtils; | |
import org.apache.hudi.common.config.TypedProperties; | |
import org.apache.hudi.common.model.HoodieKey; | |
+import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericRecord; | |
import java.io.Serializable; | |
import org.apache.log4j.LogManager; | |
import org.apache.log4j.Logger; | |
import org.apache.spark.sql.Row; | |
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; | |
+import org.apache.spark.sql.catalyst.encoders.RowEncoder; | |
import org.apache.spark.sql.types.StructType; | |
+import scala.Function1; | |
+ | |
/** | |
* Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record. | |
*/ | |
@@ -54,6 +62,18 @@ public abstract class KeyGenerator implements Serializable { | |
*/ | |
public abstract HoodieKey getKey(GenericRecord record); | |
+ public String getRecordKey(Row row, StructType schema, Schema avroSchema){ | |
+ Function1<Object, Object> converterFn = AvroConversionHelper.createConverterToRow(avroSchema, schema); | |
+ GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); | |
+ return getKey(genericRecord).getRecordKey(); | |
+ } | |
+ | |
+ public String getPartitionPath(Row row, StructType schema, Schema avroSchema){ | |
+ Function1<Object, Object> converterFn = AvroConversionHelper.createConverterToRow(avroSchema, schema); | |
+ GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); | |
+ return getKey(genericRecord).getPartitionPath(); | |
+ } | |
+ | |
public boolean isRowKeyExtractionSupported() { | |
return false; | |
} | |
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java | |
index fd146b17..580e4439 100644 | |
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java | |
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java | |
@@ -51,6 +51,10 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { | |
return this.getClass().equals(NonpartitionedKeyGenerator.class); | |
} | |
+ public String getRecordKeyFromRow(Row row){ | |
+ return super.getRecordKeyFromRow(row); | |
+ } | |
+ | |
public String getPartitionPathFromRow(Row row) { | |
return EMPTY_PARTITION; | |
} | |
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java | |
index 1f6de357..bad717fc 100644 | |
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java | |
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java | |
@@ -18,10 +18,15 @@ | |
package org.apache.hudi.keygen; | |
+import org.apache.hudi.exception.HoodieKeyException; | |
+ | |
+import org.apache.spark.sql.Row; | |
+ | |
+import java.util.Arrays; | |
import java.util.List; | |
+import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.stream.Collectors; | |
import java.util.stream.IntStream; | |
-import org.apache.spark.sql.Row; | |
public class RowKeyGeneratorHelper { | |
@@ -32,7 +37,8 @@ public class RowKeyGeneratorHelper { | |
protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; | |
public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, List<Integer> recordKeyFieldsPos) { | |
- return IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> { | |
+ AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true); | |
+ String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> { | |
String field = recordKeyFields.get(idx); | |
Integer fieldPos = recordKeyFieldsPos.get(idx); | |
if (row.isNullAt(fieldPos)) { | |
@@ -42,8 +48,13 @@ public class RowKeyGeneratorHelper { | |
if (val.isEmpty()) { | |
return fieldPos + ":" + EMPTY_RECORDKEY_PLACEHOLDER; | |
} | |
+ keyIsNullOrEmpty.set(false); | |
return fieldPos + ":" + val; | |
}).collect(Collectors.joining(",")); | |
+ if (keyIsNullOrEmpty.get()) { | |
+ throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty."); | |
+ } | |
+ return toReturn; | |
} | |
public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, | |
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java | |
index 5d9d568a..14661582 100644 | |
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java | |
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java | |
@@ -18,7 +18,6 @@ | |
package org.apache.hudi.keygen; | |
-import java.util.Arrays; | |
import org.apache.hudi.DataSourceUtils; | |
import org.apache.hudi.DataSourceWriteOptions; | |
import org.apache.hudi.common.config.TypedProperties; | |
@@ -28,6 +27,8 @@ import org.apache.hudi.exception.HoodieKeyException; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.spark.sql.Row; | |
+import java.util.Arrays; | |
+ | |
/** | |
* Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. | |
*/ | |
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java | |
index 213a6bf1..0b90250a 100644 | |
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java | |
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java | |
@@ -18,18 +18,20 @@ | |
package org.apache.hudi.utilities.keygen; | |
-import java.sql.Timestamp; | |
import org.apache.hudi.DataSourceUtils; | |
import org.apache.hudi.common.config.TypedProperties; | |
import org.apache.hudi.common.model.HoodieKey; | |
import org.apache.hudi.exception.HoodieKeyException; | |
import org.apache.hudi.exception.HoodieNotSupportedException; | |
+import org.apache.hudi.keygen.RowKeyGeneratorHelper; | |
import org.apache.hudi.keygen.SimpleKeyGenerator; | |
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; | |
import org.apache.avro.generic.GenericRecord; | |
+import org.apache.spark.sql.Row; | |
import java.io.Serializable; | |
+import java.sql.Timestamp; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.Arrays; | |
@@ -37,7 +39,6 @@ import java.util.Collections; | |
import java.util.Date; | |
import java.util.TimeZone; | |
import java.util.concurrent.TimeUnit; | |
-import org.apache.spark.sql.Row; | |
import static java.util.concurrent.TimeUnit.MILLISECONDS; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
@@ -77,7 +78,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { | |
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = | |
"hoodie.deltastreamer.keygen.timebased.output.dateformat"; | |
private static final String TIMESTAMP_TIMEZONE_FORMAT_PROP = | |
- "hoodie.deltastreamer.keygen.timebased.timezone"; | |
+ "hoodie.deltastreamer.keygen.timebased.timezone"; | |
} | |
public TimestampBasedKeyGenerator(TypedProperties config) { | |
@@ -132,7 +133,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { | |
timeMs = inputDateFormat.parse(partitionVal.toString()).getTime(); | |
} else { | |
throw new HoodieNotSupportedException( | |
- "Unexpected type for partition field: " + partitionVal.getClass().getName()); | |
+ "Unexpected type for partition field: " + partitionVal.getClass().getName()); | |
} | |
Date timestamp = new Date(timeMs); | |
String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); | |
@@ -141,7 +142,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { | |
} | |
String partitionPath = hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp) | |
- : partitionPathFormat.format(timestamp); | |
+ : partitionPathFormat.format(timestamp); | |
return new HoodieKey(recordKey, partitionPath); | |
} catch (ParseException pe) { | |
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); | |
@@ -162,13 +163,21 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { | |
return this.getClass().equals(TimestampBasedKeyGenerator.class); | |
} | |
+ public String getRecordKeyFromRow(Row row) { | |
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRowKeyFieldsPos()); | |
+ } | |
+ | |
public String getPartitionPathFromRow(Row row) { | |
- Timestamp fieldVal = row.getAs(partitionPathField); | |
- if (fieldVal == null) { | |
+ Timestamp fieldVal = null; | |
+ if (row.isNullAt(getRowPartitionPathFieldsPos().get(0))) { | |
fieldVal = new Timestamp(1L); | |
+ } else { | |
+ fieldVal = row.getAs(getPartitionPathFields().get(0)); | |
} | |
+ | |
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat); | |
partitionPathFormat.setTimeZone(timeZone); | |
- return partitionPathFormat.format(fieldVal); | |
+ return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + partitionPathFormat.format(fieldVal) | |
+ : partitionPathFormat.format(fieldVal); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment