Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created August 1, 2024 13:18
Show Gist options
  • Save nsivabalan/d02565d95d64f745844d020fce1a2434 to your computer and use it in GitHub Desktop.
Save nsivabalan/d02565d95d64f745844d020fce1a2434 to your computer and use it in GitHub Desktop.
package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.util.Properties;
public class AvroAveragePayload extends BaseAvroPayload
implements HoodieRecordPayload<AvroAveragePayload> {
public static final String AVG_INPUT_FIELDS = "hoodie.avg.payload.input.fields";
public static final String AVG_OUTPUT_FIELDS_VALUE = "hoodie.avg.payload.output.fields.value";
public static final String AVG_OUTPUT_FIELDS_COUNT = "hoodie.avg.payload.output.fields.count";
public AvroAveragePayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public AvroAveragePayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
@Override
public AvroAveragePayload preCombine(AvroAveragePayload oldValue) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value
return oldValue;
} else {
return this;
}
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
final String avgInputField = properties.getProperty(AVG_INPUT_FIELDS);
final String avgOutputFieldValue = properties.getProperty(AVG_OUTPUT_FIELDS_VALUE);
final String avgOutputFieldCount = properties.getProperty(AVG_OUTPUT_FIELDS_COUNT);
if (incomingRecord.getSchema().getField(avgInputField) == null || incomingRecord.getSchema().getField(avgOutputFieldValue) == null) {
throw new HoodieException("Sum input nor sum output field missing from table schema");
}
Long newInput = (Long) incomingRecord.get(avgInputField);
Double prevTotalAverage = (Double) ((GenericRecord) currentValue).get(avgOutputFieldValue);
Long prevTotalValues = (Long) ((GenericRecord) currentValue).get(avgOutputFieldCount);
double updatedSum = prevTotalAverage * prevTotalValues;
updatedSum += newInput;
incomingRecord.put(avgOutputFieldValue, updatedSum/(prevTotalValues + 1));
incomingRecord.put(avgOutputFieldCount, prevTotalValues + 1);
return Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
@Override
public Comparable<?> getOrderingValue() {
return this.orderingVal;
}
}
package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.util.Properties;
public class AvroSummationPayload extends BaseAvroPayload
implements HoodieRecordPayload<AvroSummationPayload> {
public static final String SUM_INPUT_FIELDS = "hoodie.summation.payload.input.fields";
public static final String SUM_OUTPUT_FIELDS = "hoodie.summation.payload.output.fields";
public AvroSummationPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public AvroSummationPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
@Override
public AvroSummationPayload preCombine(AvroSummationPayload oldValue) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value
return oldValue;
} else {
return this;
}
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
final String sumInputField = properties.getProperty(SUM_INPUT_FIELDS);
final String sumOutputField = properties.getProperty(SUM_OUTPUT_FIELDS);
if (incomingRecord.getSchema().getField(sumOutputField) == null || incomingRecord.getSchema().getField(sumInputField) == null) {
throw new HoodieException("Sum input nor sum output field missing from table schema");
}
Long sumInput = (Long) incomingRecord.get(sumInputField);
Long prevTotalSum = (Long) ((GenericRecord) currentValue).get(sumOutputField);
long updatedSum = prevTotalSum + sumInput;
incomingRecord.put(sumOutputField, updatedSum);
return Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
@Override
public Comparable<?> getOrderingValue() {
return this.orderingVal;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment