Last active
September 8, 2021 06:48
-
-
Save enryold/4be63342c8dd3d2cdd772178dd5b2b45 to your computer and use it in GitHub Desktop.
Java classes for Kinesis Firehose record transformation lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.protocol.json.JsonClientMetadata; | |
import com.amazonaws.protocol.json.SdkJsonProtocolFactory; | |
import com.amazonaws.protocol.json.StructuredJsonGenerator; | |
import com.amazonaws.services.kinesisfirehose.model.Record; | |
import com.amazonaws.services.kinesisfirehose.model.transform.RecordJsonMarshaller; | |
import com.google.gson.JsonObject; | |
import com.google.gson.JsonParser; | |
import java.nio.ByteBuffer; | |
import java.util.function.Function; | |
/** | |
* Helper function that encode datas as AWSKinesisFirehoseClient does. | |
*/ | |
public class FnEncodeBytesAsFirehose implements Function<ByteBuffer, String> { | |
@Override | |
public String apply(ByteBuffer byteBuffer) { | |
Record record = new Record(); | |
record.setData(byteBuffer); | |
final StructuredJsonGenerator jsonGenerator = new SdkJsonProtocolFactory(new JsonClientMetadata() | |
.withProtocolVersion("1.1") | |
.withSupportsCbor(false)) | |
.createGenerator(); | |
jsonGenerator.writeStartObject(); | |
jsonGenerator.writeFieldName("Record"); | |
RecordJsonMarshaller.getInstance().marshall( | |
record, jsonGenerator); | |
jsonGenerator.writeEndObject(); | |
JsonParser parser = new JsonParser(); | |
JsonObject o = parser.parse(new String(jsonGenerator.getBytes())).getAsJsonObject(); | |
return o.get("Record").getAsJsonObject().get("Data").getAsString(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
public class KinesisFirehoseEvent { | |
/** | |
* The Id of the invocation. | |
**/ | |
String invocationId = ""; | |
/** | |
* The ARN of the delivery stream sending the event. | |
**/ | |
String deliveryStreamArn = ""; | |
/** | |
* The AWS region for delivery stream. | |
**/ | |
String region = ""; | |
/** | |
* The Kinesis records to transform. | |
**/ | |
List<KinesisFirehoseEventRecord> records; | |
public String getInvocationId() { | |
return invocationId; | |
} | |
public void setInvocationId(String invocationId) { | |
this.invocationId = invocationId; | |
} | |
public String getDeliveryStreamArn() { | |
return deliveryStreamArn; | |
} | |
public void setDeliveryStreamArn(String deliveryStreamArn) { | |
this.deliveryStreamArn = deliveryStreamArn; | |
} | |
public String getRegion() { | |
return region; | |
} | |
public void setRegion(String region) { | |
this.region = region; | |
} | |
public List<KinesisFirehoseEventRecord> getRecords() { | |
return records; | |
} | |
public void setRecords(List<KinesisFirehoseEventRecord> records) { | |
this.records = records; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Base64; | |
public class KinesisFirehoseEventRecord { | |
public KinesisFirehoseEventRecord() {} | |
/** | |
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must | |
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the | |
*transformed record is treated as a data transformation failure. | |
**/ | |
String recordId = ""; | |
/** | |
* The approximate time the record was sent to Kinesis Firehose as a Unix epoch. | |
**/ | |
Long approximateArrivalEpoch; | |
/** | |
* The data sent through as a Kinesis Firehose record. The data is sent to the Lambda function base64 encoded. | |
**/ | |
String data; | |
public String getRecordId() { | |
return recordId; | |
} | |
public void setRecordId(String recordId) { | |
this.recordId = recordId; | |
} | |
public Long getApproximateArrivalEpoch() { | |
return approximateArrivalEpoch; | |
} | |
public void setApproximateArrivalEpoch(Long approximateArrivalEpoch) { | |
this.approximateArrivalEpoch = approximateArrivalEpoch; | |
} | |
public String getData() { | |
return data; | |
} | |
public void setData(String data) { | |
this.data = data; | |
} | |
public void setEncodedData(ByteBuffer byteBuffer) | |
{ | |
data = new FnEncodeBytesAsFirehose().apply(byteBuffer); | |
} | |
/** | |
* Base64 decodes the data. | |
**/ | |
public String decodedData() | |
{ | |
return new String(Base64.getDecoder().decode(data), StandardCharsets.UTF_8); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
public class KinesisFirehoseResponse { | |
public static final String TRANSFORMED_STATE_OK = "Ok"; | |
public static final String TRANSFORMED_STATE_DROPPED = "Dropped"; | |
public static final String TRANSFORMED_STATE_PROCESSINGFAILED = "ProcessingFailed"; | |
public List<KinesisFirehoseResponseRecord> records; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.UnsupportedEncodingException; | |
import java.util.Base64; | |
public class KinesisFirehoseResponseRecord { | |
public KinesisFirehoseResponseRecord() {} | |
/** | |
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must | |
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the | |
*transformed record is treated as a data transformation failure. | |
**/ | |
String recordId = ""; | |
/** | |
* The status of the data transformation of the record. The possible values are: "Ok" | |
* (the record was transformed successfully), "Dropped" (the record was dropped intentionally | |
* by your processing logic), and "ProcessingFailed" (the record could not be transformed). | |
* If a record has a status of "Ok" or "Dropped", Firehose considers it successfully | |
* processed. Otherwise, Firehose considers it unsuccessfully processed. | |
* | |
* Possible values: | |
* * Ok - The record was transformed successfully | |
* * Dropped- The record was dropped intentionally by your processing logic | |
* * ProcessingFailed - The record could not be transformed | |
**/ | |
String result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK; | |
/** | |
* The transformed data payload, after base64-encoding. | |
**/ | |
String data; | |
public String getRecordId() { | |
return recordId; | |
} | |
public void setRecordId(String recordId) { | |
this.recordId = recordId; | |
} | |
public String getResult() { | |
return result; | |
} | |
public void setResult(String result) { | |
this.result = result; | |
} | |
public String getData() { | |
return data; | |
} | |
public void setData(String data) { | |
this.data = data; | |
} | |
/** | |
* Base64 encodes the unencodedData and sets the data property. | |
**/ | |
public void encodeAndSetData(String unencodedData) | |
{ | |
try { | |
data = Base64.getEncoder().encodeToString(unencodedData.getBytes("utf-8")); | |
} catch (UnsupportedEncodingException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void encodeAndSetData(byte[] unencodedData) | |
{ | |
data = Base64.getEncoder().encodeToString(unencodedData); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment