Last active
August 30, 2016 23:13
-
-
Save jasonmimick/97c6b316a4fdd488e1975b2f3a743bc1 to your computer and use it in GitHub Desktop.
Modified optailer to support streaming oplog entries over HTTP to Splunk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# optailer.py - manage db local copies of oplog data | |
import sys,time,os | |
import signal | |
import argparse | |
import pymongo | |
from pymongo import MongoClient, CursorType | |
#from bson import TimeStamp | |
from threading import Thread | |
import logging | |
import yaml | |
from daemon import runner | |
import atexit | |
import requests | |
#import json | |
from bson.json_util import dumps | |
class App(): | |
def __init__(self,config,logger): | |
self.stop_called = False | |
self.stop_requested = False | |
self.config = config | |
self.logger = logger | |
self.stdin_path = '/dev/null' | |
pf = os.path.abspath(self.config['pidfile']) | |
self.pidfile_path = pf | |
self.pidfile_timeout = 3 | |
self.vprint("__init__") | |
self.stdout_path = os.path.abspath('./op.out') | |
self.stderr_path = os.path.abspath('./op.err') | |
self.logger.debug("optailer initialized for operation " + sys.argv[1]) | |
def run(self): | |
self.logger.info("optailer run called") | |
self.tail() | |
def vprint(self,message): | |
if self.config['verbose']==True: | |
logger.debug(message) | |
def tail(self): | |
self.logger.info("optailer tail") | |
namespaces = self.config['namespaces'].split(',') | |
self.logger.info("namespaces to tail: " + ", ".join(namespaces)) | |
threads = [] | |
for namespace in namespaces: | |
db_name,coll_name = namespace.split('.') | |
t = Thread(target=self.tail_ns, args=(db_name,coll_name)) | |
t.setDaemon(True) | |
threads.append(t) | |
[t.start() for t in threads] | |
while True: | |
threads = [t.join(20) for t in threads if t is not None and t.isAlive()] | |
self.vprint("main thread ending>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") | |
def tail_ns(self,db_name,coll_name): | |
# connect up to mongodb | |
ns = db_name + '.' + coll_name | |
local_oplog = 'oplog.' + coll_name | |
connection = pymongo.MongoClient(self.config['mongodb']) | |
local_db = connection['local'] | |
db = connection[db_name] | |
query = { "ns" : ns } | |
if local_oplog in db.collection_names(): | |
try: | |
last_local_oplog_entry = db[local_oplog].find({}).sort("ts",-1).limit(1).next() | |
query["ts"]={ "$gt" : last_local_oplog_entry['ts'] } | |
except StopIteration: #thrown when out of data so wait a little | |
self.vprint(db_name+"."+local_oplog+' exists, but no entries found') | |
else: | |
size_bytes = self.config['local_oplog_size_megabytes']*1024*1024 | |
self.logger.info(db_name+"."+local_oplog+' not found, attempting to create size='+str(size_bytes)+' bytes') | |
db.create_collection(local_oplog,capped=True,size=size_bytes) | |
self.logger.info(query) | |
#start tailable cursor | |
oplog = connection['local']['oplog.rs'].find(query,cursor_type = CursorType.TAILABLE_AWAIT) | |
if 'ts' in query: | |
oplog.add_option(8) # oplogReplay | |
while oplog.alive: | |
try: | |
if self.stop_requested: | |
self.logger.info("Tail for " + local_oplog + " stopping.") | |
oplog.close() | |
break | |
else: | |
doc = oplog.next() | |
self.vprint(doc) | |
# SPLUNK changes here | |
if self.config['mode']=='SPLUNK': | |
# Send the document to SPLUNK | |
requests.packages.urllib3.disable_warnings() | |
headers = { "Authorization" : "Splunk " + self.config['splunkToken'], | |
"X-Splunk-Request-Channel" : self.config['X-Splunk-Request-Channel'] } | |
self.vprint(headers) | |
url = self.config['httpsEndpoint'] | |
self.vprint(doc) | |
self.vprint(dumps(doc)) | |
response = requests.post(url,data=dumps(doc),headers=headers,verify=False) | |
self.vprint(vars(response)) | |
#time.sleep(20) | |
else: | |
self.try_insert(db,local_oplog,doc) | |
except StopIteration: #thrown when out of data so wait a little | |
self.vprint("sleep") | |
time.sleep(self.config['tailSleepTimeSeconds']) | |
def try_insert(self,db,coll_name, doc): | |
for i in range(5): | |
try: | |
wr = db[coll_name].insert_one(doc) | |
self.vprint(dir(wr)) # TODO: Check write result! | |
self.vprint("Inserted into " + coll_name) | |
return | |
except pymongo.errors.AutoReconnect: | |
self.logger.error("AutoReconnect error, try #" + str(i)) | |
time.sleep(pow(2, i)) | |
# if here, then we failed 5 times - log fatal error | |
self.logger.critical("Unable to insert document into " + coll_name + " - MongoDB unavailable?") | |
raise Exception("Unable to insert into MongoDB") | |
def cleanup(self): | |
if self.stop_called: | |
return | |
self.stop_called = True | |
self.logger.info("cleanup starting") | |
self.stop_requested = True | |
time.sleep(5) # sleep to let tailing thread cleanup | |
self.logger.info("cleanup complete - optailer shutting down.") | |
config_file = sys.argv[2] | |
if not os.path.isfile( config_file ): | |
print "config file '",config_file," not found" | |
sys.exit(1) | |
config = yaml.safe_load(open( config_file )) | |
logger = logging.getLogger("optailer") | |
logger.setLevel(getattr(logging,config.get('loglevel','INFO').upper())) | |
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
handler = logging.FileHandler(os.path.abspath(config['logfile'])) | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.info("log level set to " + logging.getLevelName(logger.getEffectiveLevel())) | |
app = App(config,logger) | |
if not sys.argv[1]=='stop': | |
atexit.register(app.cleanup) | |
daemon_runner = runner.DaemonRunner(app) | |
#This ensures that the logger file handle does not get closed during daemonization | |
daemon_runner.daemon_context.files_preserve=[handler.stream] | |
daemon_runner.do_action() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment