Skip to content

Instantly share code, notes, and snippets.

@pbrisbin
Created April 7, 2020 19:23
Show Gist options
  • Save pbrisbin/9d1345afcd28278527171940ed904ea6 to your computer and use it in GitHub Desktop.
Save pbrisbin/9d1345afcd28278527171940ed904ea6 to your computer and use it in GitHub Desktop.
logdna-cloudwatch repro
// External Libraries
const agent = require("agentkeepalive");
const asyncRetry = require("async").retry;
const request = require("request");
const zlib = require("zlib");
// Constants
const MAX_LINE_LENGTH = parseInt(process.env.LOGDNA_MAX_LINE_LENGTH) || 32000;
const MAX_REQUEST_TIMEOUT_MS =
parseInt(process.env.LOGDNA_MAX_REQUEST_TIMEOUT) || 30000;
const FREE_SOCKET_TIMEOUT_MS =
parseInt(process.env.LOGDNA_FREE_SOCKET_TIMEOUT) || 300000;
const LOGDNA_URL =
process.env.LOGDNA_URL || "https://logs.logdna.com/logs/ingest";
const MAX_REQUEST_RETRIES =
parseInt(process.env.LOGDNA_MAX_REQUEST_RETRIES) || 5;
const REQUEST_RETRY_INTERVAL_MS =
parseInt(process.env.LOGDNA_REQUEST_RETRY_INTERVAL) || 100;
const DEFAULT_HTTP_ERRORS = [
"ECONNRESET",
"EHOSTUNREACH",
"ETIMEDOUT",
"ESOCKETTIMEDOUT",
"ECONNREFUSED",
"ENOTFOUND",
];
const INTERNAL_SERVER_ERROR = 500;
// Get Configuration from Environment Variables
const getConfig = () => {
const pkg = require("./package.json");
let config = {
UserAgent: `${pkg.name}/${pkg.version}`,
};
if (process.env.LOGDNA_KEY) config.key = process.env.LOGDNA_KEY;
if (process.env.LOGDNA_HOSTNAME)
config.hostname = process.env.LOGDNA_HOSTNAME;
if (process.env.LOGDNA_TAGS && process.env.LOGDNA_TAGS.length > 0) {
config.tags = process.env.LOGDNA_TAGS.split(",")
.map((tag) => tag.trim())
.join(",");
}
return config;
};
// Addition
const parseLevel = (message) => {
const regex = /^(error|warn|info|debug) /i;
const result = message.match(regex);
if (result !== null && result !== undefined) {
return {
level: result[1].toUpperCase().trim(),
message: message.slice(result[0].length),
};
}
return null;
};
// Parse the GZipped Log Data
const parseEvent = (event) => {
return JSON.parse(zlib.unzipSync(Buffer.from(event.awslogs.data, "base64")));
};
// Prepare the Messages and Options
const prepareLogs = (eventData) => {
return eventData.logEvents.map((event) => {
const parsed = parseLevel(event.message); // Addition
return {
line: JSON.stringify({
message: parsed ? parsed.message : event.message, // Addition
source: "cloudwatch",
event: {
type: eventData.messageType,
id: event.id,
},
log: {
group: eventData.logGroup,
stream: eventData.logStream,
},
}),
level: parsed ? parsed.level : null, // Addition
timestamp: event.timestamp,
file: eventData.logStream,
meta: {
owner: eventData.owner,
filters: eventData.subscriptionFilters,
},
};
});
};
// Ship the Logs
const sendLine = (payload, config, callback) => {
// Check for Ingestion Key
if (!config.key) return callback("Missing LogDNA Ingestion Key");
// Set Hostname
const hostname = config.hostname || JSON.parse(payload[0].line).log.group;
// Prepare HTTP Request Options
const options = {
url: LOGDNA_URL,
qs: config.tags
? {
tags: config.tags,
hostname: hostname,
}
: {
hostname: hostname,
},
method: "POST",
body: JSON.stringify({
e: "ls",
ls: payload,
}),
auth: {
username: config.key,
},
headers: {
"Content-Type": "application/json; charset=UTF-8",
"user-agent": config.UserAgent,
},
timeout: MAX_REQUEST_TIMEOUT_MS,
withCredentials: false,
agent: new agent.HttpsAgent({
freeSocketTimeout: FREE_SOCKET_TIMEOUT_MS,
}),
};
// Flush the Log
asyncRetry(
{
times: MAX_REQUEST_RETRIES,
interval: (retryCount) => {
return REQUEST_RETRY_INTERVAL_MS * Math.pow(2, retryCount);
},
errorFilter: (errCode) => {
return (
DEFAULT_HTTP_ERRORS.includes(errCode) ||
errCode === "INTERNAL_SERVER_ERROR"
);
},
},
(reqCallback) => {
return request(options, (error, response, body) => {
if (error) {
return reqCallback(error.code);
}
if (response.statusCode >= INTERNAL_SERVER_ERROR) {
return reqCallback("INTERNAL_SERVER_ERROR");
}
return reqCallback(null, body);
});
},
(error, result) => {
if (error) return callback(error);
return callback(null, result);
}
);
};
// Main Handler
exports.handler = (event, context, callback) => {
return sendLine(prepareLogs(parseEvent(event)), getConfig(), callback);
};
// Requires .env file with LOGDNA_KEY
require("dotenv").config();
const zlib = require("zlib");
const index = require("./index");
const makeEvent = (payload) => {
const zipped = zlib.gzipSync(JSON.stringify(payload));
const base64 = Buffer.from(zipped).toString("base64");
return {
awslogs: {
data: base64,
},
};
};
test("integration", async () => {
const event = makeEvent({
messageType: "DATA_MESSAGE",
owner: "123456789012",
logGroup: "testGroup",
logStream: "testStream",
subscriptionFilters: ["LambdaStream_cloudwatchlogs-node"],
logEvents: [
{
id: "34622316099697884706540976068822859012661220141643892546",
timestamp: 1557946425136,
message: "INFO This is Log Line created from a test suite",
},
],
});
const result = await new Promise((resolve, reject) => {
index.handler(event, undefined, (error, result) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
console.log(result);
});
const request = require("request");
const zlib = require("zlib");
const prepareLogs = (event) => {
const eventData = JSON.parse(
zlib.unzipSync(Buffer.from(event.awslogs.data, "base64"))
);
const lines = eventData.logEvents.map((event) => {
const parsed = parseLevel(event.message);
return {
timestamp: event.timestamp,
env: "production",
app: eventData.logGroup,
file: eventData.logStream,
level: parsed ? parsed.level : null,
line: parsed ? parsed.message : event.message,
};
});
return { lines: lines };
};
const parseLevel = (message) => {
const regex = /^(error|warn|info|debug) /i;
const result = message.match(regex);
if (result !== null && result !== undefined) {
return {
level: result[1].toUpperCase().trim(),
message: message.slice(result[0].length),
};
}
return null;
};
const request_ = async (options) => {
return new Promise((resolve, reject) => {
request(options, (error, response, body) => {
if (error) {
reject({ error, response, body, bodyJSON: null });
} else {
try {
const bodyJSON = JSON.parse(body);
resolve({ error: null, response, body, bodyJSON });
} catch {
resolve({ error: null, response, body, bodyJSON: null });
}
}
});
});
};
exports.handler = async (event) => {
const lines = prepareLogs(event);
return await request_({
url: "https://logs.logdna.com/logs/ingest",
qs: { hostname: "cloudwatch" },
method: "POST",
body: JSON.stringify(lines),
auth: { username: process.env.LOGDNA_KEY },
headers: {
"content-type": "application/json; charset=UTF-8",
},
});
};
// Requires .env file with LOGDNA_KEY
require("dotenv").config();
const zlib = require("zlib");
const index = require("./index");
const makeEvent = (payload) => {
const zipped = zlib.gzipSync(JSON.stringify(payload));
const base64 = Buffer.from(zipped).toString("base64");
return {
awslogs: {
data: base64,
},
};
};
test("integration", async () => {
const event = makeEvent({
messageType: "DATA_MESSAGE",
owner: "123456789012",
logGroup: "testGroup",
logStream: "testStream",
subscriptionFilters: ["LambdaStream_cloudwatchlogs-node"],
logEvents: [
{
id: "34622316099697884706540976068822859012661220141643892546",
timestamp: 1557946425136,
message: "INFO This is Log Line created from a test suite",
},
],
});
const { bodyJSON } = await index.handler(event);
expect(bodyJSON.status).toBe("ok");
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment