Skip to content

Instantly share code, notes, and snippets.

@mpurzynski
Last active October 17, 2017 06:58
Show Gist options
  • Select an option

  • Save mpurzynski/2161ce89a256e57195a084bb85185bdc to your computer and use it in GitHub Desktop.

Select an option

Save mpurzynski/2161ce89a256e57195a084bb85185bdc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
#
# Contributors:
# Brandon Myers [email protected] - original code
# Michal Purzynski [email protected] - ugly hacks that made it fast (and ugly)
import json
import os
import sys
import socket
import time
from configlib import getConfig, OptionParser
from datetime import datetime, timedelta
import pytz
import boto.sqs
import kombu
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../lib'))
from utilities.toUTC import toUTC
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
from lib.plugins import sendEventToPlugins, checkPlugins
from datetime import datetime
import cProfile
# running under uwsgi?
try:
import uwsgi
hasUWSGI = True
except ImportError as e:
hasUWSGI = False
def esConnect():
'''open or re-open a connection to elastic search'''
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
class taskConsumer(object):
def __init__(self, mqConnection, taskQueue, esConnection, options, eventTaskQueue):
#self.outfile = open('/tmp/o/messages.'+str(datetime.now())+'.json', 'w')
self.connection = mqConnection
self.esConnection = esConnection
self.taskQueue = taskQueue
self.eventTaskQueue = eventTaskQueue
self.pluginList = list()
lastPluginCheck = datetime.now() - timedelta(minutes=60)
self.pluginList, lastPluginCheck = checkPlugins(self.pluginList, lastPluginCheck, options.plugincheckfrequency)
for plugin in self.pluginList:
if 'nsm' in plugin[1]:
self.nsm_plugin_handle = plugin
if 'sourceipaddress' in plugin[1] and 'http_x_forwarded_for' in plugin[1]:
self.ip_plugin_handle = plugin
self.options = options
if self.options.esbulksize != 0:
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
# to prevent events from sticking around an idle worker
self.esConnection.start_bulk_timer()
def run(self):
i = 0
while True:
try:
records = self.taskQueue.get_messages(self.options.prefetch)
for msg in records:
msg_body = msg.get_body()
try:
# get_body() should be json
message_json = json.loads(msg_body)
event = self.on_message(message_json)
except ValueError:
sys.stdout.write('Invalid message, not JSON <dropping message and continuing>: %r\n' % msg_body)
continue
except ValueError as e:
sys.stdout.write('Exception while handling message: %r' % e)
sys.exit(1)
def on_message(self, message):
# default elastic search metadata for an event
metadata = {
'index': 'events',
'doc_type': 'event',
'id': None
}
event = message
event['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
event['mozdefhostname'] = self.options.mozdefhostname
event['severity'] = 'INFO'
event['processid'] = ''
event['processname'] = ''
(anevent1, metadata) = self.nsm_plugin_handle[0].onMessage(event, metadata)
(anevent, metadata) = self.ip_plugin_handle[0].onMessage(anevent1, metadata)
self.save_event(anevent, metadata)
def save_event(self, event, metadata):
try:
# drop the message if a plug in set it to None
# signaling a discard
if event is None:
return
# make a json version for posting to elastic search
jbody = json.JSONEncoder().encode(event)
try:
bulk = False
if self.options.esbulksize != 0:
bulk = True
self.esConnection.save_object(
index=metadata['index'],
doc_id=metadata['id'],
doc_type=metadata['doc_type'],
body=jbody,
bulk=bulk
)
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
# handle loss of server or race condition with index rotation/creation/aliasing
try:
self.esConnection = esConnect()
return
except kombu.exceptions.MessageStateError:
return
except ElasticsearchException as e:
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
return
except ValueError as e:
sys.stderr.write("esworker.sqs exception in events queue %r\n" % e)
def main():
if hasUWSGI:
sys.stdout.write("started as uwsgi mule {0}\n".format(uwsgi.mule_id()))
else:
sys.stdout.write('started without uwsgi\n')
if options.mqprotocol not in ('sqs'):
sys.stdout.write('Can only process SQS queues, terminating\n')
sys.exit(1)
mqConn = boto.sqs.connect_to_region(options.region, aws_access_key_id=options.accesskey, aws_secret_access_key=options.secretkey)
# attach to the queue
eventTaskQueue = mqConn.get_queue(options.taskexchange)
# consume our queue
taskConsumer(mqConn, eventTaskQueue, es, options, eventTaskQueue).run()
def initConfig():
# capture the hostname
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
# set to sqs for Amazon
options.mqprotocol = getConfig('mqprotocol', 'sqs', options.configfile)
# rabbit message queue options
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
# rabbit: how many messages to ask for at once from the message queue
options.prefetch = getConfig('prefetch', 10, options.configfile)
# aws options
options.accesskey = getConfig('accesskey', '', options.configfile)
options.secretkey = getConfig('secretkey', '', options.configfile)
options.region = getConfig('region', 'us-west-1', options.configfile)
# plugin options
# secs to pass before checking for new/updated plugins
# seems to cause memory leaks..
# regular updates are disabled for now,
# though we set the frequency anyway.
options.plugincheckfrequency = getConfig('plugincheckfrequency', 120, options.configfile)
if __name__ == '__main__':
# configure ourselves
parser = OptionParser()
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
(options, args) = parser.parse_args()
initConfig()
# open ES connection globally so we don't waste time opening it per message
es = esConnect()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment