Skip to content

Instantly share code, notes, and snippets.

@MichaelDrogalis
Created September 6, 2019 23:59
Show Gist options
  • Save MichaelDrogalis/c7c830e3e8169211edbe80943412d0db to your computer and use it in GitHub Desktop.
Save MichaelDrogalis/c7c830e3e8169211edbe80943412d0db to your computer and use it in GitHub Desktop.
package io.confluent.developer;
import java.util.List;
import java.util.Map;
import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.function.udaf.TableUdaf;
@UdafDescription(name = "latest", description = "Returns the latest value per key")
public class LatestAggregation {
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<Integer, Integer> latestInt() {
return new TableUdaf<Integer, Integer>() {
@Override
public Integer undo(final Integer valueToUndo, final Integer aggregateValue) {
return aggregateValue;
}
@Override
public Integer initialize() {
return null;
}
@Override
public Integer aggregate(final Integer value, final Integer aggregate) {
return value;
}
@Override
public Integer merge(final Integer aggOne, final Integer aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<Long, Long> latestBigInt() {
return new TableUdaf<Long, Long>() {
@Override
public Long undo(final Long valueToUndo, final Long aggregateValue) {
return aggregateValue;
}
@Override
public Long initialize() {
return null;
}
@Override
public Long aggregate(final Long value, final Long aggregate) {
return value;
}
@Override
public Long merge(final Long aggOne, final Long aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<String, String> latestVarchar() {
return new TableUdaf<String, String>() {
@Override
public String undo(final String valueToUndo, final String aggregateValue) {
return aggregateValue;
}
@Override
public String initialize() {
return null;
}
@Override
public String aggregate(final String value, final String aggregate) {
return value;
}
@Override
public String merge(final String aggOne, final String aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<Boolean, Boolean> latestBoolean() {
return new TableUdaf<Boolean, Boolean>() {
@Override
public Boolean undo(final Boolean valueToUndo, final Boolean aggregateValue) {
return aggregateValue;
}
@Override
public Boolean initialize() {
return null;
}
@Override
public Boolean aggregate(final Boolean value, final Boolean aggregate) {
return value;
}
@Override
public Boolean merge(final Boolean aggOne, final Boolean aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<Double, Double> latestDouble() {
return new TableUdaf<Double, Double>() {
@Override
public Double undo(final Double valueToUndo, final Double aggregateValue) {
return aggregateValue;
}
@Override
public Double initialize() {
return null;
}
@Override
public Double aggregate(final Double value, final Double aggregate) {
return value;
}
@Override
public Double merge(final Double aggOne, final Double aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<List, List> latestList() {
return new TableUdaf<List, List>() {
@Override
public List undo(final List valueToUndo, final List aggregateValue) {
return aggregateValue;
}
@Override
public List initialize() {
return null;
}
@Override
public List aggregate(final List value, final List aggregate) {
return value;
}
@Override
public List merge(final List aggOne, final List aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<Map, Map> latestMap() {
return new TableUdaf<Map, Map>() {
@Override
public Map undo(final Map valueToUndo, final Map aggregateValue) {
return aggregateValue;
}
@Override
public Map initialize() {
return null;
}
@Override
public Map aggregate(final Map value, final Map aggregate) {
return value;
}
@Override
public Map merge(final Map aggOne, final Map aggTwo) {
return aggTwo;
}
};
}
@UdafFactory(description = "Returns the latest value per key")
public static TableUdaf<Map, Map> latestStruct() {
return new TableUdaf<Map, Map>() {
@Override
public Map undo(final Map valueToUndo, final Map aggregateValue) {
return aggregateValue;
}
@Override
public Map initialize() {
return null;
}
@Override
public Map aggregate(final Map value, final Map aggregate) {
return value;
}
@Override
public Map merge(final Map aggOne, final Map aggTwo) {
return aggTwo;
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment