Created
June 6, 2014 15:48
-
-
Save kelvinn/1a7b843f3febcc099bf5 to your computer and use it in GitHub Desktop.
This is a sample snippet test for loading events from CouchDB (using _changes) into Splunk.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import time | |
import pytz | |
import couchdb | |
import dateutil.parser | |
from email import utils | |
from datetime import datetime | |
import splunklib.client as client | |
DATABASE = "quantify-this" | |
#ACCOUNT_URL = 'http://splunk:[email protected]:5984' # or for something like Cloudant - careful, this hits the API a lot. | |
ACCOUNT_URL = 'http://localhost:5984' # This is your local CouchDB database | |
TIMESTAMP_KEY = 'start' | |
NAME_KEY = 'category' | |
HOST = "splunk.example.com" | |
PORT = 443 | |
USERNAME = "admin" | |
PASSWORD = "password" | |
script_dirpath = os.path.dirname(os.path.join(os.getcwd(), __file__)) | |
last_eventid_filepath = script_dirpath + "/last_eventid" | |
# Open file containing the last event ID and get the last record read | |
if os.path.isfile(last_eventid_filepath): | |
try: | |
last_eventid_file = open(last_eventid_filepath,'r') | |
last_eventid = last_eventid_file.readline() | |
last_eventid_file.close() | |
# Catch the exception. Real exception handler would be more robust | |
except IOError: | |
sys.stderr.write('Error: failed to read last_eventid file, ' + last_eventid_filepath + '\n') | |
sys.exit(2) | |
else: | |
sys.stderr.write('Error: ' + last_eventid_filepath + ' file not found! Starting from zero. \n') | |
last_eventid = 0 | |
s = couchdb.Server(ACCOUNT_URL) | |
db = s[DATABASE] | |
# the since parameter defaults to 'last_seq' when using continuous feed | |
if last_eventid: | |
ch = db.changes(continuous=True, include_docs=True, since=last_eventid, limit=5000) | |
else: | |
ch = db.changes(include_docs=True, limit=5000) | |
this_last_eventid = ch["last_seq"] | |
print this_last_eventid | |
service = client.connect( | |
host=HOST, | |
port=PORT, | |
username=USERNAME, | |
password=PASSWORD) | |
# Retrieve the index for the data | |
myindex = service.indexes["some-index"] | |
time.sleep(1) | |
for changeset in ch["results"]: | |
# Create a Service instance and log in | |
event_msg = None | |
try: | |
doc = db[changeset["id"]] | |
except couchdb.http.ResourceNotFound: | |
continue | |
else: | |
try: | |
d = doc[TIMESTAMP_KEY] | |
d2 = dateutil.parser.parse(d) | |
d2tuple = d2.timetuple() | |
d2timestamp = time.mktime(d2tuple) | |
timestamp = utils.formatdate(d2timestamp) | |
name = doc[NAME_KEY] | |
if timestamp and name: | |
event_msg = "%s name='%s'" % (timestamp, name) | |
del doc[TIMESTAMP_KEY] | |
del doc[NAME_KEY] | |
for key, value in doc.iteritems(): | |
if isinstance(value, (long, int)): | |
event_msg += " %s=%f" % (key, value) | |
else: | |
event_msg += " %s='%s'" % (key, value) | |
if len(event_msg) > 150: | |
print event_msg | |
# Submit an event over HTTP | |
myindex.submit(event_msg, sourcetype="some-event", host="local") | |
except: | |
print "error" | |
pass | |
if this_last_eventid > 0: | |
try: | |
last_eventid_file = open(last_eventid_filepath,'w') | |
last_eventid_file.write(str(this_last_eventid)) | |
last_eventid_file.close() | |
# Catch the exception. Real exception handler would be more robust | |
except IOError: | |
sys.stderr.write('Error writing last_eventid to file: ' + last_eventid_filepath + '\n') | |
sys.exit(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment