Created
August 1, 2024 13:18
-
-
Save nsivabalan/d02565d95d64f745844d020fce1a2434 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hudi.common.model; | |
import org.apache.hudi.avro.HoodieAvroUtils; | |
import org.apache.hudi.common.util.Option; | |
import org.apache.hudi.exception.HoodieException; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.generic.IndexedRecord; | |
import java.io.IOException; | |
import java.util.Properties; | |
public class AvroAveragePayload extends BaseAvroPayload | |
implements HoodieRecordPayload<AvroAveragePayload> { | |
public static final String AVG_INPUT_FIELDS = "hoodie.avg.payload.input.fields"; | |
public static final String AVG_OUTPUT_FIELDS_VALUE = "hoodie.avg.payload.output.fields.value"; | |
public static final String AVG_OUTPUT_FIELDS_COUNT = "hoodie.avg.payload.output.fields.count"; | |
public AvroAveragePayload(GenericRecord record, Comparable orderingVal) { | |
super(record, orderingVal); | |
} | |
public AvroAveragePayload(Option<GenericRecord> record) { | |
this(record.isPresent() ? record.get() : null, 0); // natural order | |
} | |
@Override | |
public AvroAveragePayload preCombine(AvroAveragePayload oldValue) { | |
if (oldValue.recordBytes.length == 0) { | |
// use natural order for delete record | |
return this; | |
} | |
if (oldValue.orderingVal.compareTo(orderingVal) > 0) { | |
// pick the payload with greatest ordering value | |
return oldValue; | |
} else { | |
return this; | |
} | |
} | |
@Override | |
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { | |
return getInsertValue(schema); | |
} | |
@Override | |
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { | |
if (recordBytes.length == 0) { | |
return Option.empty(); | |
} | |
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); | |
final String avgInputField = properties.getProperty(AVG_INPUT_FIELDS); | |
final String avgOutputFieldValue = properties.getProperty(AVG_OUTPUT_FIELDS_VALUE); | |
final String avgOutputFieldCount = properties.getProperty(AVG_OUTPUT_FIELDS_COUNT); | |
if (incomingRecord.getSchema().getField(avgInputField) == null || incomingRecord.getSchema().getField(avgOutputFieldValue) == null) { | |
throw new HoodieException("Sum input nor sum output field missing from table schema"); | |
} | |
Long newInput = (Long) incomingRecord.get(avgInputField); | |
Double prevTotalAverage = (Double) ((GenericRecord) currentValue).get(avgOutputFieldValue); | |
Long prevTotalValues = (Long) ((GenericRecord) currentValue).get(avgOutputFieldCount); | |
double updatedSum = prevTotalAverage * prevTotalValues; | |
updatedSum += newInput; | |
incomingRecord.put(avgOutputFieldValue, updatedSum/(prevTotalValues + 1)); | |
incomingRecord.put(avgOutputFieldCount, prevTotalValues + 1); | |
return Option.of(incomingRecord); | |
} | |
@Override | |
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException { | |
if (recordBytes.length == 0) { | |
return Option.empty(); | |
} | |
return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); | |
} | |
@Override | |
public Comparable<?> getOrderingValue() { | |
return this.orderingVal; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hudi.common.model; | |
import org.apache.hudi.avro.HoodieAvroUtils; | |
import org.apache.hudi.common.util.Option; | |
import org.apache.hudi.exception.HoodieException; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.generic.IndexedRecord; | |
import java.io.IOException; | |
import java.util.Properties; | |
public class AvroSummationPayload extends BaseAvroPayload | |
implements HoodieRecordPayload<AvroSummationPayload> { | |
public static final String SUM_INPUT_FIELDS = "hoodie.summation.payload.input.fields"; | |
public static final String SUM_OUTPUT_FIELDS = "hoodie.summation.payload.output.fields"; | |
public AvroSummationPayload(GenericRecord record, Comparable orderingVal) { | |
super(record, orderingVal); | |
} | |
public AvroSummationPayload(Option<GenericRecord> record) { | |
this(record.isPresent() ? record.get() : null, 0); // natural order | |
} | |
@Override | |
public AvroSummationPayload preCombine(AvroSummationPayload oldValue) { | |
if (oldValue.recordBytes.length == 0) { | |
// use natural order for delete record | |
return this; | |
} | |
if (oldValue.orderingVal.compareTo(orderingVal) > 0) { | |
// pick the payload with greatest ordering value | |
return oldValue; | |
} else { | |
return this; | |
} | |
} | |
@Override | |
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { | |
return getInsertValue(schema); | |
} | |
@Override | |
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { | |
if (recordBytes.length == 0) { | |
return Option.empty(); | |
} | |
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); | |
final String sumInputField = properties.getProperty(SUM_INPUT_FIELDS); | |
final String sumOutputField = properties.getProperty(SUM_OUTPUT_FIELDS); | |
if (incomingRecord.getSchema().getField(sumOutputField) == null || incomingRecord.getSchema().getField(sumInputField) == null) { | |
throw new HoodieException("Sum input nor sum output field missing from table schema"); | |
} | |
Long sumInput = (Long) incomingRecord.get(sumInputField); | |
Long prevTotalSum = (Long) ((GenericRecord) currentValue).get(sumOutputField); | |
long updatedSum = prevTotalSum + sumInput; | |
incomingRecord.put(sumOutputField, updatedSum); | |
return Option.of(incomingRecord); | |
} | |
@Override | |
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException { | |
if (recordBytes.length == 0) { | |
return Option.empty(); | |
} | |
return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); | |
} | |
@Override | |
public Comparable<?> getOrderingValue() { | |
return this.orderingVal; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment