Skip to content

Instantly share code, notes, and snippets.

@garysassano
Last active July 2, 2025 02:45
Show Gist options
  • Save garysassano/da706930d619c089ecffacbf15a36407 to your computer and use it in GitHub Desktop.
Save garysassano/da706930d619c089ecffacbf15a36407 to your computer and use it in GitHub Desktop.
use anyhow::{Context, Result};
use aws_lambda_events::cloudwatch_logs::LogData;
use aws_lambda_events::event::kinesis::KinesisEvent;
use otlp_stdout_span_exporter::ExporterOutput;
use serverless_otlp_forwarder_core::core_parser::EventParser;
use serverless_otlp_forwarder_core::telemetry::TelemetryData;
use std::io::Read;
pub struct KinesisOtlpStdoutParser;
impl EventParser for KinesisOtlpStdoutParser {
type EventInput = KinesisEvent;
fn parse(
&self,
event_payload: Self::EventInput,
_stream_name: &str,
) -> Result<Vec<TelemetryData>> {
let records = event_payload.records;
let mut telemetry_items = Vec::with_capacity(records.len());
for kinesis_event_record in records {
match self.convert_kinesis_record(&kinesis_event_record) {
Ok(mut telemetry_records) => {
if !telemetry_records.is_empty() {
telemetry_items.append(&mut telemetry_records);
}
}
Err(e) => {
tracing::warn!("Failed to convert Kinesis record: {}", e);
}
}
}
Ok(telemetry_items)
}
}
impl KinesisOtlpStdoutParser {
/// Convert a Kinesis record into TelemetryData
fn convert_kinesis_record(
&self,
record: &aws_lambda_events::event::kinesis::KinesisEventRecord,
) -> Result<Vec<TelemetryData>> {
// CloudWatch Logs events via Kinesis are gzipped binary data
// Use the raw binary data directly without trying to convert to UTF-8 first
let mut decoder = flate2::read::GzDecoder::new(record.kinesis.data.as_slice());
let mut decoded_cloudwatch_event_bytes = Vec::new();
decoder
.read_to_end(&mut decoded_cloudwatch_event_bytes)
.context("Failed to decompress CloudWatch Logs data")?;
// Convert the decompressed bytes to a UTF-8 string
let decoded_cloudwatch_event_str = String::from_utf8(decoded_cloudwatch_event_bytes)
.context("Failed to convert decompressed CloudWatch Logs data to UTF-8")?;
// Parse the string as CloudWatch Logs format
let log_data = serde_json::from_str::<LogData>(&decoded_cloudwatch_event_str)
.context("Failed to parse CloudWatch Logs data")?;
tracing::debug!(
"Processing {} log events from CloudWatch Logs group: {}",
log_data.log_events.len(),
log_data.log_group
);
// Process each log event (each contains one OTLP record)
let mut telemetry_records = Vec::new();
for log_entry in log_data.log_events {
// Skip CloudWatch control messages
if log_entry.message.contains("CWL CONTROL MESSAGE") {
continue;
}
// Parse the JSON in the log message as an ExporterOutput
match serde_json::from_str::<ExporterOutput>(&log_entry.message) {
Ok(record) => {
// Convert to TelemetryData
match TelemetryData::from_log_record(record) {
Ok(telemetry) => {
telemetry_records.push(telemetry);
}
Err(e) => {
tracing::warn!("Failed to convert log record to telemetry: {}", e);
}
}
}
Err(err) => {
tracing::warn!(
"Failed to parse log message as ExporterOutput: {} - Error details: {}",
log_entry.message,
err
);
}
}
}
Ok(telemetry_records)
}
}
#[cfg(test)]
mod tests {
use super::*;
use aws_lambda_events::encodings::{Base64Data, SecondTimestamp};
use aws_lambda_events::event::kinesis::{
KinesisEncryptionType, KinesisEventRecord, KinesisRecord,
};
use chrono::Utc;
use flate2::{Compression, write::GzEncoder};
use serde_json::json;
use std::io::Write;
const VALID_TEST_PAYLOAD_STRING: &str = "H4sIAAAAAAAAAAMAAAAAAAAAAAA=";
fn create_test_exporter_output_json_string(source: &str) -> String {
let output = json!({
"__otel_otlp_stdout": "[email protected]",
"source": source,
"endpoint": "http://original.collector/v1/traces",
"method": "POST",
"payload": VALID_TEST_PAYLOAD_STRING,
"headers": {
"content-type": "application/x-protobuf"
},
"content-type": "application/x-protobuf",
"content-encoding": "gzip",
"base64": true
});
serde_json::to_string(&output).unwrap()
}
fn create_cloudwatch_logs_data(log_messages: Vec<String>) -> String {
let log_events: Vec<_> = log_messages
.into_iter()
.enumerate()
.map(|(i, message)| {
json!({
"id": format!("event{}", i + 1),
"timestamp": 1234567890 + i as i64,
"message": message
})
})
.collect();
let cw_logs_data = json!({
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "/test/log-group",
"logStream": "test-stream",
"subscriptionFilters": ["test-filter"],
"logEvents": log_events
});
serde_json::to_string(&cw_logs_data).unwrap()
}
fn create_kinesis_event_record_with_cloudwatch_data(
cw_logs_data: String,
) -> KinesisEventRecord {
// Compress the CloudWatch Logs data
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(cw_logs_data.as_bytes()).unwrap();
let compressed_bytes = encoder.finish().unwrap();
KinesisEventRecord {
event_id: Some("shardId-000000000000:12345".to_string()),
event_version: Some("1.0".to_string()),
kinesis: KinesisRecord {
kinesis_schema_version: Some("1.0".to_string()),
partition_key: "test_partition_key".to_string(),
sequence_number: "1234567890".to_string(),
data: Base64Data(compressed_bytes),
approximate_arrival_timestamp: SecondTimestamp(Utc::now()),
encryption_type: KinesisEncryptionType::None,
},
invoke_identity_arn: Some("arn:aws:iam::123456789012:role/lambda-role".to_string()),
event_name: Some("aws:kinesis:record".to_string()),
event_source: Some("aws:kinesis".to_string()),
event_source_arn: Some(
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream".to_string(),
),
aws_region: Some("us-east-1".to_string()),
}
}
#[test]
fn test_kinesis_otlp_stdout_parser_success() {
let parser = KinesisOtlpStdoutParser;
let record_string1 = create_test_exporter_output_json_string("service-c");
let record_string2 = create_test_exporter_output_json_string("service-d");
let cw_logs_data = create_cloudwatch_logs_data(vec![record_string1, record_string2]);
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data);
let event = KinesisEvent {
records: vec![kinesis_record],
};
let result = parser.parse(event, "test-stream").unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].source, "service-c");
assert_eq!(result[1].source, "service-d");
assert_eq!(result[0].content_type, "application/x-protobuf");
assert_eq!(result[0].content_encoding, None);
}
#[test]
fn test_kinesis_otlp_stdout_parser_invalid_cloudwatch_data() {
let parser = KinesisOtlpStdoutParser;
// Create invalid gzip data
let invalid_gzip_data = vec![0x80, 0x81, 0x82];
let kinesis_record = KinesisEventRecord {
event_id: Some("shardId-000000000000:12345".to_string()),
event_version: Some("1.0".to_string()),
kinesis: KinesisRecord {
kinesis_schema_version: Some("1.0".to_string()),
partition_key: "test_partition_key".to_string(),
sequence_number: "1234567890".to_string(),
data: Base64Data(invalid_gzip_data),
approximate_arrival_timestamp: SecondTimestamp(Utc::now()),
encryption_type: KinesisEncryptionType::None,
},
invoke_identity_arn: Some("arn:aws:iam::123456789012:role/lambda-role".to_string()),
event_name: Some("aws:kinesis:record".to_string()),
event_source: Some("aws:kinesis".to_string()),
event_source_arn: Some(
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream".to_string(),
),
aws_region: Some("us-east-1".to_string()),
};
let event = KinesisEvent {
records: vec![kinesis_record],
};
let result = parser.parse(event, "test-stream").unwrap();
assert_eq!(result.len(), 0); // Should handle error gracefully and return empty
}
#[test]
fn test_kinesis_otlp_stdout_parser_malformed_json_in_logs() {
let parser = KinesisOtlpStdoutParser;
let malformed_json_string = "{\"invalid_json".to_string();
let valid_record_string = create_test_exporter_output_json_string("service-fine");
let cw_logs_data =
create_cloudwatch_logs_data(vec![malformed_json_string, valid_record_string]);
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data);
let event = KinesisEvent {
records: vec![kinesis_record],
};
let result = parser.parse(event, "test-stream").unwrap();
assert_eq!(result.len(), 1); // Only the valid record should be processed
assert_eq!(result[0].source, "service-fine");
}
#[test]
fn test_kinesis_otlp_stdout_parser_with_control_messages() {
let parser = KinesisOtlpStdoutParser;
let control_message = "CWL CONTROL MESSAGE: Subscription established".to_string();
let valid_record_string = create_test_exporter_output_json_string("service-ok");
let cw_logs_data = create_cloudwatch_logs_data(vec![control_message, valid_record_string]);
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data);
let event = KinesisEvent {
records: vec![kinesis_record],
};
let result = parser.parse(event, "test-stream").unwrap();
assert_eq!(result.len(), 1); // Control message should be skipped
assert_eq!(result[0].source, "service-ok");
}
#[test]
fn test_kinesis_otlp_stdout_parser_empty_records() {
let parser = KinesisOtlpStdoutParser;
let event = KinesisEvent { records: vec![] };
let result = parser.parse(event, "test-stream").unwrap();
assert!(result.is_empty());
}
#[test]
fn test_kinesis_otlp_stdout_parser_empty_log_events() {
let parser = KinesisOtlpStdoutParser;
let cw_logs_data = create_cloudwatch_logs_data(vec![]);
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data);
let event = KinesisEvent {
records: vec![kinesis_record],
};
let result = parser.parse(event, "test-stream").unwrap();
assert!(result.is_empty());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment