Created
August 24, 2025 18:07
-
-
Save garysassano/23b696378310c50fa4ecb72b8386025f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { promisify } from "node:util"; | |
| import { gunzip } from "node:zlib"; | |
| import type { KinesisStreamEvent } from "aws-lambda"; | |
| import { ProtobufTraceSerializer, JsonTraceSerializer } from "@opentelemetry/otlp-transformer"; | |
| const gunzipAsync = promisify(gunzip); | |
| // Type definitions for better organization | |
| interface CloudWatchLogsData { | |
| messageType: string; | |
| owner: string; | |
| logGroup: string; | |
| logStream: string; | |
| subscriptionFilters: string[]; | |
| logEvents: LogEvent[]; | |
| } | |
| interface LogEvent { | |
| id: string; | |
| timestamp: number; | |
| message: string; | |
| } | |
| interface OtlpMessage { | |
| __otel_otlp_stdout: string; | |
| source: string; | |
| endpoint: string; | |
| method: string; | |
| "content-type": string; | |
| "content-encoding": string; | |
| payload: string; | |
| base64: boolean; | |
| } | |
| interface OtlpAnalysis { | |
| conversionChain: string; | |
| sizes: { | |
| base64: number; | |
| binary: number; | |
| protobuf: number; | |
| }; | |
| protobufAnalysis: { | |
| hexPreview: string; | |
| readableStrings: string[]; | |
| hasTraceData: boolean; | |
| hasServiceData: boolean; | |
| }; | |
| jsonSpans?: unknown; | |
| conversionError?: string; | |
| note: string; | |
| } | |
| interface DecodedPayload { | |
| originalMessage: OtlpMessage; | |
| otlpDecoded: OtlpAnalysis; | |
| } | |
| // Utility functions | |
| function extractReadableStrings(protobufData: Buffer): string[] { | |
| const protobufString = protobufData.toString("utf8"); | |
| const readableStrings = protobufString.match(/[\x20-\x7E]{3,}/g) || []; | |
| return readableStrings.filter((s) => s.trim().length > 2); | |
| } | |
| function analyzeOtlpStructure(readableStrings: string[], protobufString: string) { | |
| const hasTraceId = protobufString.includes("trace") || protobufString.includes("span"); | |
| const hasServiceName = readableStrings.some((s) => s.includes("service")); | |
| return { | |
| hasTraceData: hasTraceId, | |
| hasServiceData: hasServiceName, | |
| }; | |
| } | |
| function categorizeStrings(strings: string[]) { | |
| const serviceInfo = strings.filter( | |
| (s) => s.includes("service.") || s.includes("faas.") || s.includes("cloud.") | |
| ); | |
| const telemetryInfo = strings.filter( | |
| (s) => s.includes("telemetry.") || s.includes("opentelemetry") | |
| ); | |
| const httpInfo = strings.filter( | |
| (s) => s.includes("http.") || s.includes("url.") | |
| ); | |
| const otherInfo = strings.filter( | |
| (s) => | |
| !serviceInfo.includes(s) && | |
| !telemetryInfo.includes(s) && | |
| !httpInfo.includes(s) && | |
| s.length > 3 | |
| ); | |
| return { serviceInfo, telemetryInfo, httpInfo, otherInfo }; | |
| } | |
| function logTelemetryCategories(categories: ReturnType<typeof categorizeStrings>) { | |
| const { serviceInfo, telemetryInfo, httpInfo, otherInfo } = categories; | |
| if (serviceInfo.length > 0) { | |
| console.log(" 🏷️ Service & Resource Info:"); | |
| for (const info of serviceInfo) { | |
| console.log(` - ${info}`); | |
| } | |
| } | |
| if (telemetryInfo.length > 0) { | |
| console.log(" 📡 Telemetry SDK Info:"); | |
| for (const info of telemetryInfo) { | |
| console.log(` - ${info}`); | |
| } | |
| } | |
| if (httpInfo.length > 0) { | |
| console.log(" 🌐 HTTP Request Info:"); | |
| for (const info of httpInfo) { | |
| console.log(` - ${info}`); | |
| } | |
| } | |
| if (otherInfo.length > 0) { | |
| console.log(" 📋 Other Trace Data:"); | |
| for (const info of otherInfo.slice(0, 10)) { | |
| console.log(` - ${info}`); | |
| } | |
| } | |
| } | |
| // Main decoding functions | |
| async function decodeOtlpPayload(base64Payload: string): Promise<OtlpAnalysis> { | |
| try { | |
| console.log("=== Starting Full OTLP Decoding Chain ==="); | |
| // Step 1: base64 → binary | |
| console.log("Step 1: base64 → binary"); | |
| const binaryData = Buffer.from(base64Payload, "base64"); | |
| console.log(` ✓ Decoded ${base64Payload.length} base64 chars → ${binaryData.length} bytes`); | |
| // Step 2: binary → gunzip → protobuf | |
| console.log("Step 2: binary → gunzip → protobuf"); | |
| const protobufData = await gunzipAsync(binaryData); | |
| console.log(` ✓ Decompressed ${binaryData.length} bytes → ${protobufData.length} bytes`); | |
| // Step 3: protobuf → JSON spans conversion | |
| console.log("Step 3: protobuf → JSON spans"); | |
| let jsonSpans = null; | |
| let conversionError = null; | |
| try { | |
| // Try to use the OTLP transformers to parse the protobuf | |
| // Note: These are static objects with methods, not constructors | |
| // The protobuf data should be an ExportTraceServiceRequest | |
| // Let's try to deserialize it | |
| console.log(` ⚠️ Note: Direct protobuf parsing requires the exact OTLP schema`); | |
| console.log(` 📋 The protobuf data structure is complex and requires proper schema parsing`); | |
| // For now, we'll show that we have the tools but need proper implementation | |
| console.log(` ✓ ProtobufTraceSerializer available: ${typeof ProtobufTraceSerializer}`); | |
| console.log(` ✓ JsonTraceSerializer available: ${typeof JsonTraceSerializer}`); | |
| // TODO: Implement proper OTLP protobuf parsing | |
| // This would require understanding the exact message structure | |
| } catch (err) { | |
| conversionError = err instanceof Error ? err.message : "Unknown conversion error"; | |
| console.log(` ⚠️ Protobuf to JSON conversion failed: ${conversionError}`); | |
| } | |
| const readableStrings = extractReadableStrings(protobufData); | |
| const protobufString = protobufData.toString("utf8"); | |
| const structureAnalysis = analyzeOtlpStructure(readableStrings, protobufString); | |
| // Show preview info | |
| console.log(` • Protobuf hex (first 200 bytes): ${protobufData.slice(0, 200).toString("hex")}`); | |
| console.log(" • Readable strings found in OTLP data:"); | |
| readableStrings.slice(0, 20).forEach((str) => { | |
| console.log(` - "${str.trim()}"`); | |
| }); | |
| console.log(" • OTLP Structure Analysis:"); | |
| console.log(` - Contains trace/span data: ${structureAnalysis.hasTraceData}`); | |
| console.log(` - Contains service info: ${structureAnalysis.hasServiceData}`); | |
| console.log(` - Total readable strings: ${readableStrings.length}`); | |
| return { | |
| conversionChain: "base64 → binary → gunzip → OTLP protobuf → JSON", | |
| sizes: { | |
| base64: base64Payload.length, | |
| binary: binaryData.length, | |
| protobuf: protobufData.length, | |
| }, | |
| protobufAnalysis: { | |
| hexPreview: protobufData.slice(0, 200).toString("hex"), | |
| readableStrings: readableStrings.slice(0, 50), // Limit output | |
| hasTraceData: structureAnalysis.hasTraceData, | |
| hasServiceData: structureAnalysis.hasServiceData, | |
| }, | |
| jsonSpans: jsonSpans, | |
| conversionError: conversionError || undefined, | |
| note: jsonSpans ? "Successfully converted OTLP protobuf to JSON spans!" : "Protobuf conversion failed - showing string analysis", | |
| }; | |
| } catch (decodeError) { | |
| console.error("❌ Error in OTLP decoding chain:", decodeError); | |
| throw new Error(`OTLP decoding failed: ${decodeError instanceof Error ? decodeError.message : "Unknown error"}`); | |
| } | |
| } | |
| async function decodeNestedPayload(message: string): Promise<DecodedPayload | string> { | |
| try { | |
| const parsedMessage = JSON.parse(message) as OtlpMessage; | |
| if (parsedMessage.payload && parsedMessage.base64) { | |
| console.log("\n🔍 Found OTLP payload in log message!"); | |
| console.log(`📡 Source: ${parsedMessage.source}`); | |
| console.log(`🎯 Endpoint: ${parsedMessage.endpoint}`); | |
| console.log(`📋 Content-Type: ${parsedMessage["content-type"]}`); | |
| console.log(`🗜️ Content-Encoding: ${parsedMessage["content-encoding"]}`); | |
| // Execute the full conversion chain | |
| const otlpData = await decodeOtlpPayload(parsedMessage.payload); | |
| return { | |
| originalMessage: parsedMessage, | |
| otlpDecoded: otlpData, | |
| }; | |
| } | |
| return message; | |
| } catch { | |
| return message; | |
| } | |
| } | |
| function logOtlpMetadata(originalMessage: OtlpMessage) { | |
| console.log("\n📡 OTLP Request Metadata:"); | |
| console.log(` • Source: ${originalMessage.source}`); | |
| console.log(` • Endpoint: ${originalMessage.endpoint}`); | |
| console.log(` • Method: ${originalMessage.method}`); | |
| console.log(` • Content-Type: ${originalMessage["content-type"]}`); | |
| console.log(` • Content-Encoding: ${originalMessage["content-encoding"]}`); | |
| console.log(` • OTLP Version: ${originalMessage.__otel_otlp_stdout}`); | |
| } | |
| function logConversionResults(otlpData: OtlpAnalysis) { | |
| console.log("\n🔄 Conversion Chain Results:"); | |
| console.log(` • Process: ${otlpData.conversionChain}`); | |
| console.log(` • Base64 size: ${otlpData.sizes.base64} chars`); | |
| console.log(` • Binary size: ${otlpData.sizes.binary} bytes`); | |
| console.log(` • Protobuf size: ${otlpData.sizes.protobuf} bytes`); | |
| console.log(` • Compression ratio: ${Math.round((otlpData.sizes.base64 / otlpData.sizes.protobuf) * 100)}%`); | |
| } | |
| function logExtractedTelemetry(analysis: OtlpAnalysis["protobufAnalysis"]) { | |
| console.log("\n📊 Extracted OTLP Telemetry Data:"); | |
| const categories = categorizeStrings(analysis.readableStrings); | |
| logTelemetryCategories(categories); | |
| console.log(` • Total data points extracted: ${analysis.readableStrings.length}`); | |
| console.log(` • Has service data: ${analysis.hasServiceData}`); | |
| console.log(` • Has trace data: ${analysis.hasTraceData}`); | |
| } | |
| function logJsonSpans(jsonSpans: unknown) { | |
| if (jsonSpans) { | |
| console.log("\n🎯 CONVERTED JSON SPANS:"); | |
| console.log("=".repeat(50)); | |
| console.log(JSON.stringify(jsonSpans, null, 2)); | |
| console.log("=".repeat(50)); | |
| } | |
| }// Main handler | |
| export const handler = async (event: KinesisStreamEvent) => { | |
| console.log("🚀 Received Kinesis event with", event.Records.length, "records"); | |
| for (const record of event.Records) { | |
| try { | |
| // CloudWatch Logs → Kinesis decoding | |
| const payload = Buffer.from(record.kinesis.data, "base64"); | |
| const decompressed = await gunzipAsync(payload); | |
| const logData: CloudWatchLogsData = JSON.parse(decompressed.toString("utf8")); | |
| console.log("\n📊 CloudWatch Logs Data:"); | |
| console.log("- Message Type:", logData.messageType); | |
| console.log("- Owner:", logData.owner); | |
| console.log("- Log Group:", logData.logGroup); | |
| console.log("- Log Stream:", logData.logStream); | |
| console.log("- Number of log events:", logData.logEvents.length); | |
| // Process each log event for OTLP data | |
| for (let index = 0; index < logData.logEvents.length; index++) { | |
| const logEvent = logData.logEvents[index]; | |
| console.log(`\n=== 📝 Log Event ${index + 1}/${logData.logEvents.length} ===`); | |
| console.log("� ID:", logEvent.id); | |
| console.log("⏰ Timestamp:", new Date(logEvent.timestamp).toISOString()); | |
| // Try to decode nested OTLP payload | |
| const decodedMessage = await decodeNestedPayload(logEvent.message); | |
| if (typeof decodedMessage === "object" && "originalMessage" in decodedMessage) { | |
| console.log("✅ Successfully decoded OTLP payload!"); | |
| const decoded = decodedMessage as DecodedPayload; | |
| // Show organized output | |
| logOtlpMetadata(decoded.originalMessage); | |
| logConversionResults(decoded.otlpDecoded); | |
| logExtractedTelemetry(decoded.otlpDecoded.protobufAnalysis); | |
| // Show JSON spans if conversion was successful | |
| if (decoded.otlpDecoded.jsonSpans) { | |
| logJsonSpans(decoded.otlpDecoded.jsonSpans); | |
| } else if (decoded.otlpDecoded.conversionError) { | |
| console.log(`\n⚠️ JSON Conversion Status: ${decoded.otlpDecoded.conversionError}`); | |
| console.log("📋 Note: OTLP protobuf to JSON conversion requires exact schema matching"); | |
| console.log("🔧 Available transformers: ProtobufTraceSerializer, JsonTraceSerializer"); | |
| } | |
| } else { | |
| console.log("📄 Plain log message:", logEvent.message); | |
| } | |
| } | |
| } catch (error) { | |
| console.error("❌ Error processing Kinesis record:", error); | |
| console.error("Raw record data:", JSON.stringify(record, null, 2)); | |
| } | |
| } | |
| return { | |
| statusCode: 200, | |
| body: JSON.stringify({ message: "Successfully processed Kinesis records with OTLP decoding" }), | |
| }; | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment