Skip to content

Instantly share code, notes, and snippets.

@timpamungkas
Last active April 25, 2022 07:51
Show Gist options
  • Save timpamungkas/984d42787bb23bd6306d03a2135aaea1 to your computer and use it in GitHub Desktop.
Save timpamungkas/984d42787bb23bd6306d03a2135aaea1 to your computer and use it in GitHub Desktop.
Kafka KsqlDB UDTF (tabular function)
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udtf.Udtf;
import io.confluent.ksql.function.udtf.UdtfDescription;
@UdtfDescription(name = "loan_installment_schedule", category = "LOAN", author = "Timotius Pamungkas", version = "1.0.0", description = "User defined function for sample tabular loan business logic.")
public class LoanUdtf {
public static final String LOAN_SUBMISSION_SCHEMA_DESCRIPTOR = "STRUCT<" + "`principalLoanAmount` DOUBLE, "
+ "`annualInterestRate` DOUBLE, " + "`loanPeriodMonth` INT, " + "`loanApprovedDate` VARCHAR" + ">";
public static final String LOAN_MONTHLY_INSTALLMENT_SCHEMA_DESCRIPTOR = "STRUCT<" + "`installmentAmount` DOUBLE, "
+ "`installmentDueDate` VARCHAR" + ">";
private static final Schema LOAN_MONTHLY_INSTALLMENT_SCHEMA = SchemaBuilder.struct()
.field("installmentAmount", Schema.FLOAT64_SCHEMA).field("installmentDueDate", Schema.STRING_SCHEMA);
@Udtf(description = "Calculate loan installment schedule, comprise of installment amount and due date", schema = LOAN_MONTHLY_INSTALLMENT_SCHEMA_DESCRIPTOR)
public List<Struct> calculateLoanSchedule(
@UdfParameter(schema = LOAN_SUBMISSION_SCHEMA_DESCRIPTOR) Struct loanSubmission) {
var principalLoanAmount = loanSubmission.getFloat64("principalLoanAmount");
var annualInterestRate = loanSubmission.getFloat64("annualInterestRate");
var loanPeriodMonth = loanSubmission.getInt32("loanPeriodMonth");
var loanApprovedDate = LocalDate.parse(loanSubmission.getString("loanApprovedDate"));
var result = new ArrayList<Struct>();
var monthlyInstallmentAmount = LoanCalculator.calculateMonthlyInstallment(principalLoanAmount,
annualInterestRate, loanPeriodMonth);
IntStream.rangeClosed(1, loanPeriodMonth).forEachOrdered(i -> {
var installmentDueDate = loanApprovedDate.plusMonths(i);
var monthlyInstallment = new Struct(LOAN_MONTHLY_INSTALLMENT_SCHEMA);
monthlyInstallment.put("installmentAmount", monthlyInstallmentAmount);
monthlyInstallment.put("installmentDueDate", installmentDueDate.toString());
result.add(monthlyInstallment);
});
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment