Last active
April 20, 2020 18:48
-
-
Save cfuerst/75382a7487e312c33c0f32a7283b77e7 to your computer and use it in GitHub Desktop.
lambda-node-srcds-to-es
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* receives logs from srcds engine via config | |
* logaddress_delall_http (remove all listeners) | |
* logaddress_add_http "https://foo.bar" (add listener) | |
* tranform them to meaningful events and send them to elasticsearch | |
*/ | |
//libs | |
const https = require('https'); | |
const crypto = require('crypto'); | |
//elasticsearch connection | |
const elasticsearchRqOptions = { | |
hostname: process.env.ES_URL, | |
path: '/_bulk', | |
port: 9243, | |
method: 'POST', | |
headers: { | |
'Authorization': 'ApiKey ' + process.env.ES_API_KEY, | |
'Content-Type': 'application/x-ndjson' | |
} | |
}; | |
//easticsearch index name | |
const currentDate = new Date().toJSON().slice(0,10); | |
const elasticIndexName = 'srcds_logs_'+currentDate; | |
//identify log events by regex adapted by https://github.com/Nols1000/srcds-logs | |
const parser = { | |
"convar": / - "(.*)" = "(.*)"/, | |
"matchvar": / - server_cvar: "(.*)" "(.*)"/, | |
"cvarsStart": / - server cvars start/, | |
"cvarsEnd": / - server cvars end/, | |
"freeze": / - Starting Freeze period/, | |
"say": / - "(.*)<(\d*)><(.*)><(.*)>" say "(.*)"/, | |
"kill": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] killed "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] with "(.*)"/, | |
"blinded": / - "(.*)<(\d*)><(.*)><(.*)>" blinded for ([0-9]*\.[0-9]+|[0-9]+) by "(.*)<(\d*)><(.*)><(.*)>" from flashbang(.*)/, | |
"attack": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] attacked "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] with "(.*)" \(damage "(\d*)"\) (.*) \(hitgroup "(.*)"\)/, | |
"killH": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] killed "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] with "(.*)" \((.*)\)/, | |
"killOther": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] killed other "(.*)<(\d*)>" \[(.*)\] with "(.*)"/, | |
"killSuicide": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] committed suicide with "(.*)"/, | |
"assist": / - "(.*)<(\d*)><(.*)><(.*)>" assisted killing "(.*)<(\d*)><(.*)><(.*)>"/, | |
"throwNade": / - "(.*)<(\d*)><(.*)><(.*)>" threw (.*) \[(.*) (.*) (.*)\]/, | |
"projectile":/ - (.*) projectile spawned at (.*) (.*) (.*), velocity (.*) (.*) (.*)/, | |
"purchase": / - "(.*)<(\d*)><(.*)><(.*)>" purchased "(.*)"/, | |
"leaveBuyZone": / - "(.*)<(\d*)><(.*)><(.*)>" left buyzone with \[(.*)\]/, | |
"player": / - "(.*)<(\d*)><(.*)><(.*)>" triggered "(.*)"/, | |
"playerTeamchange": / - "(.*)<(\d*)><(.*)>" switched from team <(.*)> to <(.*)>/, | |
"gameOver": / - Game Over: (.*) (\d+) (.*) score (\d+:\d+) after (\d+) min/, | |
"team": / - Team "(.*)" triggered "(.*)" \(CT "(\d*)"\) \(T "(\d*)"\)"/, | |
"teamName": / - Team playing "(.*)": (.*)/, | |
"accoladeFinal": / - ACCOLADE, FINAL: \{(.*)\},(.*)<(\d*)>,(.*),(.*),(.*)/, | |
"score": / - Team "(.*)" scored "(\d*)" with "(\d*)" players/, | |
"world": / - World triggered "(.*)"/, | |
"connect": / - "(.*)<(\d*)><(.*)><(.*)>" connected, address "(.*)"/, | |
"entered": / - "(.*)<(\d*)><(.*)><(.*)>" entered the game/, | |
"validated": / - "(.*)<(\d*)><(.*)><(.*)>" STEAM USERID validated/, | |
"disconnect": / - "(.*)<(\d*)><(.*)><(.*)>" disconnected \((.*)\)/, | |
"mapLoading": / - Loading map "(.*)"/, | |
"mapStarted": / - Started map "(.*)" \(CRC "(.*)"\)/, | |
"playerInfo": / - "(.*)<(\d*)><(.*)><(.*)>" (\[.*\]) (.*)/, | |
"teamTrigger": / - Team "(.*)" triggered "(.*)" \(([C|T]+) "(\d+)"\) \(([CT]+) "(\d+)"\)/ | |
}; | |
// log timestamp parser | |
const parserTs = /(\d\d\/\d\d\/\d\d\d\d - \d\d:\d\d:\d\d\.\d\d\d)/; | |
//do not send those event types to the es cluster | |
const blacklistedEvents = ['convar', 'matchvar', 'validated', 'entered', 'connect', 'disconnect', 'projectile', 'cvarsStart', 'cvarsEnd', 'playerTeamchange', 'freeze']; | |
//add a function per event type/key from the regex patterns here to transform a line | |
const lineTransformersByKey = { | |
defaultTransformer: function(item, attributes, initiator, opponent) { | |
var event = { | |
'eventName': 'event_' + item.key, | |
'rawEvent': item.matches[0], | |
'timestamp': getEsTimestampFormat(item.matches[1]) | |
}; | |
if (attributes === Object(attributes)) { | |
event['eventProperties'+ucfirst(item.key)] = attributes; | |
} | |
if (initiator === Object(initiator)) { | |
event['initiator'] = initiator; | |
event.initiator.name_if_not_bot = initiator.id === 'BOT' ? 'BOT' : initiator.name; | |
} | |
if (opponent === Object(opponent)) { | |
event['opponent'] = opponent; | |
event.opponent.name_if_not_bot = opponent.id === 'BOT' ? 'BOT' : opponent.name; | |
} | |
if (initiator === Object(initiator) && opponent === Object(opponent)) { | |
event['self_inflicted'] = initiator.id == opponent.id && initiator.name == opponent.name; | |
event['opponent_eq_initiator_team'] = initiator.team == opponent.team; | |
} | |
return event; | |
}, | |
kill: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': item.matches[12], | |
'distance_meters': calcDistinace(item.matches[6], item.matches[11]) | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
}, | |
{ | |
'id': item.matches[9], | |
'name': item.matches[7], | |
'team': item.matches[10], | |
} | |
); | |
}, | |
killOther: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': item.matches[10], | |
'distance_meters': calcDistinace(item.matches[6], item.matches[9]), | |
'description': item.matches[7] | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
killSuicide: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': item.matches[7], | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
attack: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': item.matches[12], | |
'distance_meters': calcDistinace(item.matches[6], item.matches[11]), | |
'damage': parseInt(item.matches[13]), | |
'hitgroup': item.matches[15], | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
}, | |
{ | |
'id': item.matches[9], | |
'name': item.matches[7], | |
'team': item.matches[10], | |
} | |
); | |
}, | |
blinded: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': 'flashbang', | |
'blinded_for_seconds': parseFloat(item.matches[6]) | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
}, | |
{ | |
'id': item.matches[9], | |
'name': item.matches[7], | |
'team': item.matches[10], | |
} | |
); | |
}, | |
assist: function(item) { | |
return this.defaultTransformer(item, null, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
}, | |
{ | |
'id': item.matches[8], | |
'name': item.matches[6], | |
'team': item.matches[9], | |
} | |
); | |
}, | |
throwNade: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': item.matches[6] | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
purchase: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'weapon': item.matches[6] | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
leaveBuyZone: function(item) { | |
let data = item.matches[6].trim(' ').split(' '); | |
return this.defaultTransformer(item, | |
{ | |
'gear': data.length > 1 ? data : [], | |
'no_gear': data.length > 1 ? false : true | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
player: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'command': item.matches[6] | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
playerInfo: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'info': item.matches[7].replace('.','') | |
}, | |
{ | |
'id': item.matches[4], | |
'name': item.matches[2], | |
'team': item.matches[5] | |
} | |
); | |
}, | |
world: function(item) { | |
let str = item.matches[2].split('" on "'); | |
let data = {'command': str[0]}; | |
if (str.length >= 2) { | |
data['detail'] = str[1]; | |
} | |
return this.defaultTransformer(item, data); | |
}, | |
teamTrigger: function(item) { | |
let data = { 'command':item.matches[3], 'triggered_by_team': item.matches[2] }; | |
data['team_'+item.matches[4]+'_score'] = item.matches[5]; | |
data['team_'+item.matches[6]+'_score'] = item.matches[7]; | |
return this.defaultTransformer(item, data); | |
}, | |
score: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'team': item.matches[2], | |
'score': parseInt(item.matches[3]), | |
'num_players': parseInt(item.matches[4]) | |
} | |
); | |
}, | |
gameOver: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'game_mode': item.matches[2], | |
'map_id': item.matches[3], | |
'map_name': item.matches[4], | |
'score': item.matches[5], | |
'score_left': item.matches[5].split(':')[0], | |
'score_right': item.matches[5].split(':')[1], | |
'minutes': item.matches[6].split(':')[1], | |
} | |
); | |
}, | |
accoladeFinal: function(item) { | |
return this.defaultTransformer(item, | |
{ | |
'badge': item.matches[2], | |
'player_name': item.matches[3], | |
'player_rank': parseInt(item.matches[4]), | |
'badge_value': parseFloat(item.matches[5].split(': ')[1]), | |
'badge_rank': parseFloat(item.matches[6].split(': ')[1]), | |
'score': parseFloat(item.matches[7].split(': ')[1]), | |
} | |
); | |
} | |
}; | |
//main handler | |
exports.handler = async function(event, context, callback) { | |
//events which were transformed | |
let transformedEvents = []; | |
//bulk request body for es | |
let bulkRequestBody; | |
// if data was sent | |
if ('body' in event) { | |
//assume multilines per request (multi line logs will be sent in certain conditions) | |
const lines = await cleanRequestBodyAndGetLinesArray(event.body); | |
//array of transformed events for debug purpose | |
transformedEvents = await getTransformedEvents(lines); | |
//debug dont send to es | |
//return callback(null, {"StatusCode": 200, "events": transformedEvents}); | |
//bulk body for elasticsearch | |
if (transformedEvents.length > 0) { | |
//get the full bulk body | |
bulkRequestBody = await getElasticBulkBody(transformedEvents); | |
//call elasticsearch to index the data | |
const response = sendEventsToElasticInBulk(bulkRequestBody); | |
// promise resolved or rejected asynchronously | |
response.then((response) => { | |
// log some summary | |
var data = JSON.parse(response); | |
if (data === Object(data) && 'items' in data && 'errors' in data) { | |
console.log('sucess: ', getBulkRequestSummary(data)); | |
} else { | |
console.log('fail: unexpected response'); | |
} | |
}).catch((error) => { | |
console.log(error); | |
}); | |
} | |
} | |
return callback(null, {"StatusCode": 200}); | |
} | |
//parse all lines and return the transformed events | |
function getTransformedEvents(lines) { | |
let events = []; | |
//go trough all log entries | |
lines.forEach(function(line) { | |
if (line.trim()) { | |
//try to get a regex match | |
let result = parseLineAndGetFirstMatch(line); | |
//if found a match and not blacklisted proceed | |
if (!blacklistedEvents.includes(result.key)) { | |
//get the line transformer and call it or default | |
let transformer = result.key in lineTransformersByKey && result.found ? result.key : 'defaultTransformer'; | |
//get the event in the transformed format | |
let transformedEvent = lineTransformersByKey[transformer](result); | |
//do not send to ES if debug | |
events.push({'event':transformedEvent, 'raw':result}); | |
} | |
} | |
}); | |
return events; | |
} | |
//do some cleanup and split lines | |
function cleanRequestBodyAndGetLinesArray(data) { | |
return unescape(data.toString().replace(/\t/g, '')).split('\n'); | |
} | |
//find matches against parser regex | |
function parseLineAndGetFirstMatch(line) { | |
for (const key in parser) { | |
const found = line.match(new RegExp(parserTs.source + parser[key].source)); | |
if (found) { | |
return {'found': true, 'key': key, 'matches': found}; | |
} | |
} | |
return {'found': false, 'key': 'unmapped', 'matches': [line, line.substring(0,25)]}; | |
} | |
//calculate distance between 2 points on the map | |
function calcDistinace(kPos, vPos) { | |
const [x1, y1, z1] = kPos.split(' '); | |
const [x2, y2, z2] = vPos.split(' '); | |
const distance_units = Math.sqrt( | |
((x1-x2)**2) + ((y1-y2)**2) + ((z1-z2)**2) | |
); | |
const distance_meters = (distance_units * 2.540) / 100; | |
return Math.round(distance_meters * 100) / 100; | |
} | |
//upercase first letter | |
function ucfirst(str) { | |
return str.charAt(0).toUpperCase() + str.slice(1); | |
} | |
//convert srcds log timestmap to elastic format | |
function getEsTimestampFormat(str) { | |
const s1 = str.split(' - '); | |
const s2 = s1[0].split('/'); | |
return s2[2]+'-'+s2[0]+'-'+s2[1]+"T"+s1[1]+"Z"; | |
} | |
//get the request body for a bulk index request to elasticsearch | |
function getElasticBulkBody(items) { | |
let body = ''; | |
items.forEach(function(item) { | |
let id = crypto.createHash('md5').update(item.event.rawEvent).digest('hex'); | |
body += '{ "index" : { "_index" : "' + elasticIndexName + '", "_id" : "'+id+'" } }'+"\n"; | |
body += JSON.stringify(item.event)+"\n"; | |
}); | |
return body; | |
} | |
//send the event to elasticsearch return promise | |
function sendEventsToElasticInBulk(bulkBody) { | |
// define the promise | |
let request = new Promise((resolve, reject) => { | |
const req = https.request(elasticsearchRqOptions, (response) => { | |
let chunks_of_data = []; | |
response.on('data', (fragments) => { | |
chunks_of_data.push(fragments); | |
}); | |
response.on('end', () => { | |
let response_body = Buffer.concat(chunks_of_data); | |
resolve(response_body.toString()); | |
}); | |
response.on('error', (error) => { | |
reject(error); | |
}); | |
}); | |
req.write(bulkBody); | |
req.end(); | |
}); | |
return request; | |
} | |
//get a summary of what was bulked | |
function getBulkRequestSummary(data) { | |
let summary = {}; | |
data.items.forEach(function(item) { | |
let key = Object.keys(item)[0]; | |
let operation = item[key].result; | |
let success = item[key]._shards.total === item[key]._shards.successful ? 'success' : 'failed'; | |
let summaryKey = key+'_'+operation+'_'+success; | |
if (!(summaryKey in summary)) { | |
summary[summaryKey] = 0; | |
} | |
summary[summaryKey]++; | |
}); | |
return summary; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment