Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active July 5, 2020 23:49
Show Gist options
  • Save nsivabalan/30b2b553939e0e5719e3e407a80b8925 to your computer and use it in GitHub Desktop.
Save nsivabalan/30b2b553939e0e5719e3e407a80b8925 to your computer and use it in GitHub Desktop.
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 315c2659..a699e0c8 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
import java.util.Arrays;
import java.util.List;
@@ -39,15 +40,14 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator {
private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
- protected final List<String> recordKeyFields;
-
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
- this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
+ this.setRecordKeyFields(Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")));
}
@Override
public HoodieKey getKey(GenericRecord record) {
+ List<String> recordKeyFields = getRecordKeyFields();
if (recordKeyFields == null) {
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
}
@@ -73,4 +73,17 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator {
return new HoodieKey(recordKey.toString(), EMPTY_PARTITION);
}
+
+ public boolean isRowKeyExtractionSupported() {
+ // key-generator implementation that inherits from this class needs to implement this method
+ return this.getClass().equals(GlobalDeleteKeyGenerator.class);
+ }
+
+ public String getRecordKeyFromRow(Row row) {
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRowKeyFieldsPos());
+ }
+
+ public String getPartitionPathFromRow(Row row) {
+ return EMPTY_PARTITION;
+ }
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
index a9b1b6ec..5edbc540 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
@@ -20,17 +20,25 @@ package org.apache.hudi.keygen;
import java.util.List;
import java.util.stream.Collectors;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
+import scala.Function1;
+
/**
* Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record.
*/
@@ -54,6 +62,18 @@ public abstract class KeyGenerator implements Serializable {
*/
public abstract HoodieKey getKey(GenericRecord record);
+ public String getRecordKey(Row row, StructType schema, Schema avroSchema){
+ Function1<Object, Object> converterFn = AvroConversionHelper.createConverterToRow(avroSchema, schema);
+ GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+ return getKey(genericRecord).getRecordKey();
+ }
+
+ public String getPartitionPath(Row row, StructType schema, Schema avroSchema){
+ Function1<Object, Object> converterFn = AvroConversionHelper.createConverterToRow(avroSchema, schema);
+ GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+ return getKey(genericRecord).getPartitionPath();
+ }
+
public boolean isRowKeyExtractionSupported() {
return false;
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index fd146b17..580e4439 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -51,6 +51,10 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
return this.getClass().equals(NonpartitionedKeyGenerator.class);
}
+ public String getRecordKeyFromRow(Row row){
+ return super.getRecordKeyFromRow(row);
+ }
+
public String getPartitionPathFromRow(Row row) {
return EMPTY_PARTITION;
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index 1f6de357..bad717fc 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -18,10 +18,15 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.spark.sql.Row;
public class RowKeyGeneratorHelper {
@@ -32,7 +37,8 @@ public class RowKeyGeneratorHelper {
protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, List<Integer> recordKeyFieldsPos) {
- return IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> {
+ AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+ String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> {
String field = recordKeyFields.get(idx);
Integer fieldPos = recordKeyFieldsPos.get(idx);
if (row.isNullAt(fieldPos)) {
@@ -42,8 +48,13 @@ public class RowKeyGeneratorHelper {
if (val.isEmpty()) {
return fieldPos + ":" + EMPTY_RECORDKEY_PLACEHOLDER;
}
+ keyIsNullOrEmpty.set(false);
return fieldPos + ":" + val;
}).collect(Collectors.joining(","));
+ if (keyIsNullOrEmpty.get()) {
+ throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
+ }
+ return toReturn;
}
public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields,
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index 5d9d568a..14661582 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -18,7 +18,6 @@
package org.apache.hudi.keygen;
-import java.util.Arrays;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
@@ -28,6 +27,8 @@ import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
+import java.util.Arrays;
+
/**
* Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
index 213a6bf1..0b90250a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
@@ -18,18 +18,20 @@
package org.apache.hudi.utilities.keygen;
-import java.sql.Timestamp;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.RowKeyGeneratorHelper;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
import java.io.Serializable;
+import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
@@ -37,7 +39,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
-import org.apache.spark.sql.Row;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -77,7 +78,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen.timebased.output.dateformat";
private static final String TIMESTAMP_TIMEZONE_FORMAT_PROP =
- "hoodie.deltastreamer.keygen.timebased.timezone";
+ "hoodie.deltastreamer.keygen.timebased.timezone";
}
public TimestampBasedKeyGenerator(TypedProperties config) {
@@ -132,7 +133,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
timeMs = inputDateFormat.parse(partitionVal.toString()).getTime();
} else {
throw new HoodieNotSupportedException(
- "Unexpected type for partition field: " + partitionVal.getClass().getName());
+ "Unexpected type for partition field: " + partitionVal.getClass().getName());
}
Date timestamp = new Date(timeMs);
String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true);
@@ -141,7 +142,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
}
String partitionPath = hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp)
- : partitionPathFormat.format(timestamp);
+ : partitionPathFormat.format(timestamp);
return new HoodieKey(recordKey, partitionPath);
} catch (ParseException pe) {
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe);
@@ -162,13 +163,21 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
return this.getClass().equals(TimestampBasedKeyGenerator.class);
}
+ public String getRecordKeyFromRow(Row row) {
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRowKeyFieldsPos());
+ }
+
public String getPartitionPathFromRow(Row row) {
- Timestamp fieldVal = row.getAs(partitionPathField);
- if (fieldVal == null) {
+ Timestamp fieldVal = null;
+ if (row.isNullAt(getRowPartitionPathFieldsPos().get(0))) {
fieldVal = new Timestamp(1L);
+ } else {
+ fieldVal = row.getAs(getPartitionPathFields().get(0));
}
+
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat);
partitionPathFormat.setTimeZone(timeZone);
- return partitionPathFormat.format(fieldVal);
+ return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + partitionPathFormat.format(fieldVal)
+ : partitionPathFormat.format(fieldVal);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment