Skip to content

Instantly share code, notes, and snippets.

@kelvinn
Created June 6, 2014 15:48
Show Gist options
  • Save kelvinn/1a7b843f3febcc099bf5 to your computer and use it in GitHub Desktop.
Save kelvinn/1a7b843f3febcc099bf5 to your computer and use it in GitHub Desktop.
This is a sample snippet test for loading events from CouchDB (using _changes) into Splunk.
import os
import sys
import json
import time
import pytz
import couchdb
import dateutil.parser
from email import utils
from datetime import datetime
import splunklib.client as client
DATABASE = "quantify-this"
#ACCOUNT_URL = 'http://splunk:[email protected]:5984' # or for something like Cloudant - careful, this hits the API a lot.
ACCOUNT_URL = 'http://localhost:5984' # This is your local CouchDB database
TIMESTAMP_KEY = 'start'
NAME_KEY = 'category'
HOST = "splunk.example.com"
PORT = 443
USERNAME = "admin"
PASSWORD = "password"
script_dirpath = os.path.dirname(os.path.join(os.getcwd(), __file__))
last_eventid_filepath = script_dirpath + "/last_eventid"
# Open file containing the last event ID and get the last record read
if os.path.isfile(last_eventid_filepath):
try:
last_eventid_file = open(last_eventid_filepath,'r')
last_eventid = last_eventid_file.readline()
last_eventid_file.close()
# Catch the exception. Real exception handler would be more robust
except IOError:
sys.stderr.write('Error: failed to read last_eventid file, ' + last_eventid_filepath + '\n')
sys.exit(2)
else:
sys.stderr.write('Error: ' + last_eventid_filepath + ' file not found! Starting from zero. \n')
last_eventid = 0
s = couchdb.Server(ACCOUNT_URL)
db = s[DATABASE]
# the since parameter defaults to 'last_seq' when using continuous feed
if last_eventid:
ch = db.changes(continuous=True, include_docs=True, since=last_eventid, limit=5000)
else:
ch = db.changes(include_docs=True, limit=5000)
this_last_eventid = ch["last_seq"]
print this_last_eventid
service = client.connect(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD)
# Retrieve the index for the data
myindex = service.indexes["some-index"]
time.sleep(1)
for changeset in ch["results"]:
# Create a Service instance and log in
event_msg = None
try:
doc = db[changeset["id"]]
except couchdb.http.ResourceNotFound:
continue
else:
try:
d = doc[TIMESTAMP_KEY]
d2 = dateutil.parser.parse(d)
d2tuple = d2.timetuple()
d2timestamp = time.mktime(d2tuple)
timestamp = utils.formatdate(d2timestamp)
name = doc[NAME_KEY]
if timestamp and name:
event_msg = "%s name='%s'" % (timestamp, name)
del doc[TIMESTAMP_KEY]
del doc[NAME_KEY]
for key, value in doc.iteritems():
if isinstance(value, (long, int)):
event_msg += " %s=%f" % (key, value)
else:
event_msg += " %s='%s'" % (key, value)
if len(event_msg) > 150:
print event_msg
# Submit an event over HTTP
myindex.submit(event_msg, sourcetype="some-event", host="local")
except:
print "error"
pass
if this_last_eventid > 0:
try:
last_eventid_file = open(last_eventid_filepath,'w')
last_eventid_file.write(str(this_last_eventid))
last_eventid_file.close()
# Catch the exception. Real exception handler would be more robust
except IOError:
sys.stderr.write('Error writing last_eventid to file: ' + last_eventid_filepath + '\n')
sys.exit(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment