Last active
July 2, 2025 02:45
-
-
Save garysassano/da706930d619c089ecffacbf15a36407 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::{Context, Result}; | |
use aws_lambda_events::cloudwatch_logs::LogData; | |
use aws_lambda_events::event::kinesis::KinesisEvent; | |
use otlp_stdout_span_exporter::ExporterOutput; | |
use serverless_otlp_forwarder_core::core_parser::EventParser; | |
use serverless_otlp_forwarder_core::telemetry::TelemetryData; | |
use std::io::Read; | |
pub struct KinesisOtlpStdoutParser; | |
impl EventParser for KinesisOtlpStdoutParser { | |
type EventInput = KinesisEvent; | |
fn parse( | |
&self, | |
event_payload: Self::EventInput, | |
_stream_name: &str, | |
) -> Result<Vec<TelemetryData>> { | |
let records = event_payload.records; | |
let mut telemetry_items = Vec::with_capacity(records.len()); | |
for kinesis_event_record in records { | |
match self.convert_kinesis_record(&kinesis_event_record) { | |
Ok(mut telemetry_records) => { | |
if !telemetry_records.is_empty() { | |
telemetry_items.append(&mut telemetry_records); | |
} | |
} | |
Err(e) => { | |
tracing::warn!("Failed to convert Kinesis record: {}", e); | |
} | |
} | |
} | |
Ok(telemetry_items) | |
} | |
} | |
impl KinesisOtlpStdoutParser { | |
/// Convert a Kinesis record into TelemetryData | |
fn convert_kinesis_record( | |
&self, | |
record: &aws_lambda_events::event::kinesis::KinesisEventRecord, | |
) -> Result<Vec<TelemetryData>> { | |
// CloudWatch Logs events via Kinesis are gzipped binary data | |
// Use the raw binary data directly without trying to convert to UTF-8 first | |
let mut decoder = flate2::read::GzDecoder::new(record.kinesis.data.as_slice()); | |
let mut decoded_cloudwatch_event_bytes = Vec::new(); | |
decoder | |
.read_to_end(&mut decoded_cloudwatch_event_bytes) | |
.context("Failed to decompress CloudWatch Logs data")?; | |
// Convert the decompressed bytes to a UTF-8 string | |
let decoded_cloudwatch_event_str = String::from_utf8(decoded_cloudwatch_event_bytes) | |
.context("Failed to convert decompressed CloudWatch Logs data to UTF-8")?; | |
// Parse the string as CloudWatch Logs format | |
let log_data = serde_json::from_str::<LogData>(&decoded_cloudwatch_event_str) | |
.context("Failed to parse CloudWatch Logs data")?; | |
tracing::debug!( | |
"Processing {} log events from CloudWatch Logs group: {}", | |
log_data.log_events.len(), | |
log_data.log_group | |
); | |
// Process each log event (each contains one OTLP record) | |
let mut telemetry_records = Vec::new(); | |
for log_entry in log_data.log_events { | |
// Skip CloudWatch control messages | |
if log_entry.message.contains("CWL CONTROL MESSAGE") { | |
continue; | |
} | |
// Parse the JSON in the log message as an ExporterOutput | |
match serde_json::from_str::<ExporterOutput>(&log_entry.message) { | |
Ok(record) => { | |
// Convert to TelemetryData | |
match TelemetryData::from_log_record(record) { | |
Ok(telemetry) => { | |
telemetry_records.push(telemetry); | |
} | |
Err(e) => { | |
tracing::warn!("Failed to convert log record to telemetry: {}", e); | |
} | |
} | |
} | |
Err(err) => { | |
tracing::warn!( | |
"Failed to parse log message as ExporterOutput: {} - Error details: {}", | |
log_entry.message, | |
err | |
); | |
} | |
} | |
} | |
Ok(telemetry_records) | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use aws_lambda_events::encodings::{Base64Data, SecondTimestamp}; | |
use aws_lambda_events::event::kinesis::{ | |
KinesisEncryptionType, KinesisEventRecord, KinesisRecord, | |
}; | |
use chrono::Utc; | |
use flate2::{Compression, write::GzEncoder}; | |
use serde_json::json; | |
use std::io::Write; | |
const VALID_TEST_PAYLOAD_STRING: &str = "H4sIAAAAAAAAAAMAAAAAAAAAAAA="; | |
fn create_test_exporter_output_json_string(source: &str) -> String { | |
let output = json!({ | |
"__otel_otlp_stdout": "[email protected]", | |
"source": source, | |
"endpoint": "http://original.collector/v1/traces", | |
"method": "POST", | |
"payload": VALID_TEST_PAYLOAD_STRING, | |
"headers": { | |
"content-type": "application/x-protobuf" | |
}, | |
"content-type": "application/x-protobuf", | |
"content-encoding": "gzip", | |
"base64": true | |
}); | |
serde_json::to_string(&output).unwrap() | |
} | |
fn create_cloudwatch_logs_data(log_messages: Vec<String>) -> String { | |
let log_events: Vec<_> = log_messages | |
.into_iter() | |
.enumerate() | |
.map(|(i, message)| { | |
json!({ | |
"id": format!("event{}", i + 1), | |
"timestamp": 1234567890 + i as i64, | |
"message": message | |
}) | |
}) | |
.collect(); | |
let cw_logs_data = json!({ | |
"messageType": "DATA_MESSAGE", | |
"owner": "123456789012", | |
"logGroup": "/test/log-group", | |
"logStream": "test-stream", | |
"subscriptionFilters": ["test-filter"], | |
"logEvents": log_events | |
}); | |
serde_json::to_string(&cw_logs_data).unwrap() | |
} | |
fn create_kinesis_event_record_with_cloudwatch_data( | |
cw_logs_data: String, | |
) -> KinesisEventRecord { | |
// Compress the CloudWatch Logs data | |
let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); | |
encoder.write_all(cw_logs_data.as_bytes()).unwrap(); | |
let compressed_bytes = encoder.finish().unwrap(); | |
KinesisEventRecord { | |
event_id: Some("shardId-000000000000:12345".to_string()), | |
event_version: Some("1.0".to_string()), | |
kinesis: KinesisRecord { | |
kinesis_schema_version: Some("1.0".to_string()), | |
partition_key: "test_partition_key".to_string(), | |
sequence_number: "1234567890".to_string(), | |
data: Base64Data(compressed_bytes), | |
approximate_arrival_timestamp: SecondTimestamp(Utc::now()), | |
encryption_type: KinesisEncryptionType::None, | |
}, | |
invoke_identity_arn: Some("arn:aws:iam::123456789012:role/lambda-role".to_string()), | |
event_name: Some("aws:kinesis:record".to_string()), | |
event_source: Some("aws:kinesis".to_string()), | |
event_source_arn: Some( | |
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream".to_string(), | |
), | |
aws_region: Some("us-east-1".to_string()), | |
} | |
} | |
#[test] | |
fn test_kinesis_otlp_stdout_parser_success() { | |
let parser = KinesisOtlpStdoutParser; | |
let record_string1 = create_test_exporter_output_json_string("service-c"); | |
let record_string2 = create_test_exporter_output_json_string("service-d"); | |
let cw_logs_data = create_cloudwatch_logs_data(vec![record_string1, record_string2]); | |
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data); | |
let event = KinesisEvent { | |
records: vec![kinesis_record], | |
}; | |
let result = parser.parse(event, "test-stream").unwrap(); | |
assert_eq!(result.len(), 2); | |
assert_eq!(result[0].source, "service-c"); | |
assert_eq!(result[1].source, "service-d"); | |
assert_eq!(result[0].content_type, "application/x-protobuf"); | |
assert_eq!(result[0].content_encoding, None); | |
} | |
#[test] | |
fn test_kinesis_otlp_stdout_parser_invalid_cloudwatch_data() { | |
let parser = KinesisOtlpStdoutParser; | |
// Create invalid gzip data | |
let invalid_gzip_data = vec![0x80, 0x81, 0x82]; | |
let kinesis_record = KinesisEventRecord { | |
event_id: Some("shardId-000000000000:12345".to_string()), | |
event_version: Some("1.0".to_string()), | |
kinesis: KinesisRecord { | |
kinesis_schema_version: Some("1.0".to_string()), | |
partition_key: "test_partition_key".to_string(), | |
sequence_number: "1234567890".to_string(), | |
data: Base64Data(invalid_gzip_data), | |
approximate_arrival_timestamp: SecondTimestamp(Utc::now()), | |
encryption_type: KinesisEncryptionType::None, | |
}, | |
invoke_identity_arn: Some("arn:aws:iam::123456789012:role/lambda-role".to_string()), | |
event_name: Some("aws:kinesis:record".to_string()), | |
event_source: Some("aws:kinesis".to_string()), | |
event_source_arn: Some( | |
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream".to_string(), | |
), | |
aws_region: Some("us-east-1".to_string()), | |
}; | |
let event = KinesisEvent { | |
records: vec![kinesis_record], | |
}; | |
let result = parser.parse(event, "test-stream").unwrap(); | |
assert_eq!(result.len(), 0); // Should handle error gracefully and return empty | |
} | |
#[test] | |
fn test_kinesis_otlp_stdout_parser_malformed_json_in_logs() { | |
let parser = KinesisOtlpStdoutParser; | |
let malformed_json_string = "{\"invalid_json".to_string(); | |
let valid_record_string = create_test_exporter_output_json_string("service-fine"); | |
let cw_logs_data = | |
create_cloudwatch_logs_data(vec![malformed_json_string, valid_record_string]); | |
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data); | |
let event = KinesisEvent { | |
records: vec![kinesis_record], | |
}; | |
let result = parser.parse(event, "test-stream").unwrap(); | |
assert_eq!(result.len(), 1); // Only the valid record should be processed | |
assert_eq!(result[0].source, "service-fine"); | |
} | |
#[test] | |
fn test_kinesis_otlp_stdout_parser_with_control_messages() { | |
let parser = KinesisOtlpStdoutParser; | |
let control_message = "CWL CONTROL MESSAGE: Subscription established".to_string(); | |
let valid_record_string = create_test_exporter_output_json_string("service-ok"); | |
let cw_logs_data = create_cloudwatch_logs_data(vec![control_message, valid_record_string]); | |
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data); | |
let event = KinesisEvent { | |
records: vec![kinesis_record], | |
}; | |
let result = parser.parse(event, "test-stream").unwrap(); | |
assert_eq!(result.len(), 1); // Control message should be skipped | |
assert_eq!(result[0].source, "service-ok"); | |
} | |
#[test] | |
fn test_kinesis_otlp_stdout_parser_empty_records() { | |
let parser = KinesisOtlpStdoutParser; | |
let event = KinesisEvent { records: vec![] }; | |
let result = parser.parse(event, "test-stream").unwrap(); | |
assert!(result.is_empty()); | |
} | |
#[test] | |
fn test_kinesis_otlp_stdout_parser_empty_log_events() { | |
let parser = KinesisOtlpStdoutParser; | |
let cw_logs_data = create_cloudwatch_logs_data(vec![]); | |
let kinesis_record = create_kinesis_event_record_with_cloudwatch_data(cw_logs_data); | |
let event = KinesisEvent { | |
records: vec![kinesis_record], | |
}; | |
let result = parser.parse(event, "test-stream").unwrap(); | |
assert!(result.is_empty()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment