Skip to content

Instantly share code, notes, and snippets.

@garysassano
Created August 24, 2025 18:07
Show Gist options
  • Save garysassano/23b696378310c50fa4ecb72b8386025f to your computer and use it in GitHub Desktop.
Save garysassano/23b696378310c50fa4ecb72b8386025f to your computer and use it in GitHub Desktop.
import { promisify } from "node:util";
import { gunzip } from "node:zlib";
import type { KinesisStreamEvent } from "aws-lambda";
import { ProtobufTraceSerializer, JsonTraceSerializer } from "@opentelemetry/otlp-transformer";
const gunzipAsync = promisify(gunzip);
// Type definitions for better organization
interface CloudWatchLogsData {
messageType: string;
owner: string;
logGroup: string;
logStream: string;
subscriptionFilters: string[];
logEvents: LogEvent[];
}
interface LogEvent {
id: string;
timestamp: number;
message: string;
}
interface OtlpMessage {
__otel_otlp_stdout: string;
source: string;
endpoint: string;
method: string;
"content-type": string;
"content-encoding": string;
payload: string;
base64: boolean;
}
interface OtlpAnalysis {
conversionChain: string;
sizes: {
base64: number;
binary: number;
protobuf: number;
};
protobufAnalysis: {
hexPreview: string;
readableStrings: string[];
hasTraceData: boolean;
hasServiceData: boolean;
};
jsonSpans?: unknown;
conversionError?: string;
note: string;
}
interface DecodedPayload {
originalMessage: OtlpMessage;
otlpDecoded: OtlpAnalysis;
}
// Utility functions
function extractReadableStrings(protobufData: Buffer): string[] {
const protobufString = protobufData.toString("utf8");
const readableStrings = protobufString.match(/[\x20-\x7E]{3,}/g) || [];
return readableStrings.filter((s) => s.trim().length > 2);
}
function analyzeOtlpStructure(readableStrings: string[], protobufString: string) {
const hasTraceId = protobufString.includes("trace") || protobufString.includes("span");
const hasServiceName = readableStrings.some((s) => s.includes("service"));
return {
hasTraceData: hasTraceId,
hasServiceData: hasServiceName,
};
}
function categorizeStrings(strings: string[]) {
const serviceInfo = strings.filter(
(s) => s.includes("service.") || s.includes("faas.") || s.includes("cloud.")
);
const telemetryInfo = strings.filter(
(s) => s.includes("telemetry.") || s.includes("opentelemetry")
);
const httpInfo = strings.filter(
(s) => s.includes("http.") || s.includes("url.")
);
const otherInfo = strings.filter(
(s) =>
!serviceInfo.includes(s) &&
!telemetryInfo.includes(s) &&
!httpInfo.includes(s) &&
s.length > 3
);
return { serviceInfo, telemetryInfo, httpInfo, otherInfo };
}
function logTelemetryCategories(categories: ReturnType<typeof categorizeStrings>) {
const { serviceInfo, telemetryInfo, httpInfo, otherInfo } = categories;
if (serviceInfo.length > 0) {
console.log(" 🏷️ Service & Resource Info:");
for (const info of serviceInfo) {
console.log(` - ${info}`);
}
}
if (telemetryInfo.length > 0) {
console.log(" 📡 Telemetry SDK Info:");
for (const info of telemetryInfo) {
console.log(` - ${info}`);
}
}
if (httpInfo.length > 0) {
console.log(" 🌐 HTTP Request Info:");
for (const info of httpInfo) {
console.log(` - ${info}`);
}
}
if (otherInfo.length > 0) {
console.log(" 📋 Other Trace Data:");
for (const info of otherInfo.slice(0, 10)) {
console.log(` - ${info}`);
}
}
}
// Main decoding functions
async function decodeOtlpPayload(base64Payload: string): Promise<OtlpAnalysis> {
try {
console.log("=== Starting Full OTLP Decoding Chain ===");
// Step 1: base64 → binary
console.log("Step 1: base64 → binary");
const binaryData = Buffer.from(base64Payload, "base64");
console.log(` ✓ Decoded ${base64Payload.length} base64 chars → ${binaryData.length} bytes`);
// Step 2: binary → gunzip → protobuf
console.log("Step 2: binary → gunzip → protobuf");
const protobufData = await gunzipAsync(binaryData);
console.log(` ✓ Decompressed ${binaryData.length} bytes → ${protobufData.length} bytes`);
// Step 3: protobuf → JSON spans conversion
console.log("Step 3: protobuf → JSON spans");
let jsonSpans = null;
let conversionError = null;
try {
// Try to use the OTLP transformers to parse the protobuf
// Note: These are static objects with methods, not constructors
// The protobuf data should be an ExportTraceServiceRequest
// Let's try to deserialize it
console.log(` ⚠️ Note: Direct protobuf parsing requires the exact OTLP schema`);
console.log(` 📋 The protobuf data structure is complex and requires proper schema parsing`);
// For now, we'll show that we have the tools but need proper implementation
console.log(` ✓ ProtobufTraceSerializer available: ${typeof ProtobufTraceSerializer}`);
console.log(` ✓ JsonTraceSerializer available: ${typeof JsonTraceSerializer}`);
// TODO: Implement proper OTLP protobuf parsing
// This would require understanding the exact message structure
} catch (err) {
conversionError = err instanceof Error ? err.message : "Unknown conversion error";
console.log(` ⚠️ Protobuf to JSON conversion failed: ${conversionError}`);
}
const readableStrings = extractReadableStrings(protobufData);
const protobufString = protobufData.toString("utf8");
const structureAnalysis = analyzeOtlpStructure(readableStrings, protobufString);
// Show preview info
console.log(` • Protobuf hex (first 200 bytes): ${protobufData.slice(0, 200).toString("hex")}`);
console.log(" • Readable strings found in OTLP data:");
readableStrings.slice(0, 20).forEach((str) => {
console.log(` - "${str.trim()}"`);
});
console.log(" • OTLP Structure Analysis:");
console.log(` - Contains trace/span data: ${structureAnalysis.hasTraceData}`);
console.log(` - Contains service info: ${structureAnalysis.hasServiceData}`);
console.log(` - Total readable strings: ${readableStrings.length}`);
return {
conversionChain: "base64 → binary → gunzip → OTLP protobuf → JSON",
sizes: {
base64: base64Payload.length,
binary: binaryData.length,
protobuf: protobufData.length,
},
protobufAnalysis: {
hexPreview: protobufData.slice(0, 200).toString("hex"),
readableStrings: readableStrings.slice(0, 50), // Limit output
hasTraceData: structureAnalysis.hasTraceData,
hasServiceData: structureAnalysis.hasServiceData,
},
jsonSpans: jsonSpans,
conversionError: conversionError || undefined,
note: jsonSpans ? "Successfully converted OTLP protobuf to JSON spans!" : "Protobuf conversion failed - showing string analysis",
};
} catch (decodeError) {
console.error("❌ Error in OTLP decoding chain:", decodeError);
throw new Error(`OTLP decoding failed: ${decodeError instanceof Error ? decodeError.message : "Unknown error"}`);
}
}
async function decodeNestedPayload(message: string): Promise<DecodedPayload | string> {
try {
const parsedMessage = JSON.parse(message) as OtlpMessage;
if (parsedMessage.payload && parsedMessage.base64) {
console.log("\n🔍 Found OTLP payload in log message!");
console.log(`📡 Source: ${parsedMessage.source}`);
console.log(`🎯 Endpoint: ${parsedMessage.endpoint}`);
console.log(`📋 Content-Type: ${parsedMessage["content-type"]}`);
console.log(`🗜️ Content-Encoding: ${parsedMessage["content-encoding"]}`);
// Execute the full conversion chain
const otlpData = await decodeOtlpPayload(parsedMessage.payload);
return {
originalMessage: parsedMessage,
otlpDecoded: otlpData,
};
}
return message;
} catch {
return message;
}
}
function logOtlpMetadata(originalMessage: OtlpMessage) {
console.log("\n📡 OTLP Request Metadata:");
console.log(` • Source: ${originalMessage.source}`);
console.log(` • Endpoint: ${originalMessage.endpoint}`);
console.log(` • Method: ${originalMessage.method}`);
console.log(` • Content-Type: ${originalMessage["content-type"]}`);
console.log(` • Content-Encoding: ${originalMessage["content-encoding"]}`);
console.log(` • OTLP Version: ${originalMessage.__otel_otlp_stdout}`);
}
function logConversionResults(otlpData: OtlpAnalysis) {
console.log("\n🔄 Conversion Chain Results:");
console.log(` • Process: ${otlpData.conversionChain}`);
console.log(` • Base64 size: ${otlpData.sizes.base64} chars`);
console.log(` • Binary size: ${otlpData.sizes.binary} bytes`);
console.log(` • Protobuf size: ${otlpData.sizes.protobuf} bytes`);
console.log(` • Compression ratio: ${Math.round((otlpData.sizes.base64 / otlpData.sizes.protobuf) * 100)}%`);
}
function logExtractedTelemetry(analysis: OtlpAnalysis["protobufAnalysis"]) {
console.log("\n📊 Extracted OTLP Telemetry Data:");
const categories = categorizeStrings(analysis.readableStrings);
logTelemetryCategories(categories);
console.log(` • Total data points extracted: ${analysis.readableStrings.length}`);
console.log(` • Has service data: ${analysis.hasServiceData}`);
console.log(` • Has trace data: ${analysis.hasTraceData}`);
}
function logJsonSpans(jsonSpans: unknown) {
if (jsonSpans) {
console.log("\n🎯 CONVERTED JSON SPANS:");
console.log("=".repeat(50));
console.log(JSON.stringify(jsonSpans, null, 2));
console.log("=".repeat(50));
}
}// Main handler
export const handler = async (event: KinesisStreamEvent) => {
console.log("🚀 Received Kinesis event with", event.Records.length, "records");
for (const record of event.Records) {
try {
// CloudWatch Logs → Kinesis decoding
const payload = Buffer.from(record.kinesis.data, "base64");
const decompressed = await gunzipAsync(payload);
const logData: CloudWatchLogsData = JSON.parse(decompressed.toString("utf8"));
console.log("\n📊 CloudWatch Logs Data:");
console.log("- Message Type:", logData.messageType);
console.log("- Owner:", logData.owner);
console.log("- Log Group:", logData.logGroup);
console.log("- Log Stream:", logData.logStream);
console.log("- Number of log events:", logData.logEvents.length);
// Process each log event for OTLP data
for (let index = 0; index < logData.logEvents.length; index++) {
const logEvent = logData.logEvents[index];
console.log(`\n=== 📝 Log Event ${index + 1}/${logData.logEvents.length} ===`);
console.log("� ID:", logEvent.id);
console.log("⏰ Timestamp:", new Date(logEvent.timestamp).toISOString());
// Try to decode nested OTLP payload
const decodedMessage = await decodeNestedPayload(logEvent.message);
if (typeof decodedMessage === "object" && "originalMessage" in decodedMessage) {
console.log("✅ Successfully decoded OTLP payload!");
const decoded = decodedMessage as DecodedPayload;
// Show organized output
logOtlpMetadata(decoded.originalMessage);
logConversionResults(decoded.otlpDecoded);
logExtractedTelemetry(decoded.otlpDecoded.protobufAnalysis);
// Show JSON spans if conversion was successful
if (decoded.otlpDecoded.jsonSpans) {
logJsonSpans(decoded.otlpDecoded.jsonSpans);
} else if (decoded.otlpDecoded.conversionError) {
console.log(`\n⚠️ JSON Conversion Status: ${decoded.otlpDecoded.conversionError}`);
console.log("📋 Note: OTLP protobuf to JSON conversion requires exact schema matching");
console.log("🔧 Available transformers: ProtobufTraceSerializer, JsonTraceSerializer");
}
} else {
console.log("📄 Plain log message:", logEvent.message);
}
}
} catch (error) {
console.error("❌ Error processing Kinesis record:", error);
console.error("Raw record data:", JSON.stringify(record, null, 2));
}
}
return {
statusCode: 200,
body: JSON.stringify({ message: "Successfully processed Kinesis records with OTLP decoding" }),
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment