Skip to content

Instantly share code, notes, and snippets.

@jceloria
Last active August 30, 2017 22:20
Show Gist options
  • Select an option

  • Save jceloria/007febb7688c1cb1a8527bbd63628ede to your computer and use it in GitHub Desktop.

Select an option

Save jceloria/007febb7688c1cb1a8527bbd63628ede to your computer and use it in GitHub Desktop.
logz_io.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
################################################################################
"""
Query a ES index to find patterns
Copyright © 2016
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Sample data:
{
"client": 123,
"message": "Hey, this is a message"
}
{
"client": 123,
"message": "Message is great"
}
{
"client": 789,
"message": "Just a string"
}
"""
################################################################################
## Set some defaults
__author__ = 'John Celoria'
__version__ = '0.1'
# Interval for each search (seconds)
INTERVAL = 10
################################################################################
## Library imports
import argparse, logging.handlers, signal, socket, sys, time
from threading import Thread
from collections import Counter
try:
from elasticsearch import Elasticsearch
except:
print("Oops!, looks like you need to install the elasticsearch module.")
sys.exit(-1)
################################################################################
# Setup logging
log = logging.getLogger(__name__)
# Suppress less than WARNING level messages for the request module
logging.getLogger("requests").setLevel(logging.WARNING)
# Default logging format
log_format = '[%(filename)s:%(funcName)s]: %(levelname)s - %(message)s'
# Log to the console (stderr by default)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter('%(asctime)s ' + log_format))
log.addHandler(ch)
################################################################################
es = None
def init_es(args):
global es
es = Elasticsearch(hosts = [args.address])
def add_document(args, client, message):
if not isinstance(client, int):
client = int(client)
try:
res = es.index(index=args.index, doc_type='test', body = {
'client': client,
'message': message
})
except:
log.warning("Unable to add document to index: "+index)
sys.exit(-1)
def search_index(args):
totals = []
while True:
for query in args.file:
log.debug("Searching index for: '"+query.rstrip('\n')+"'")
res = es.search(index=args.index,
body={"query": {
"query_string": {
"fields": ["message"],
"query": query
}
}}
)
totals.extend([h['_source']['client'] for h in res['hits']['hits']])
c = Counter(totals)
for client in c:
print('client: %s, occurrences: %d' % (client, c[client]))
time.sleep(args.seconds)
def delete_index(args):
while True:
question = "Don't be stupid now, are you 100% on this?"
res = str(input(question+' (y/N): ')).lower().strip()
if res and res[0] == 'y':
try:
es.indices.delete(index=args.index, ignore=[400, 404])
except:
log.warning("No such index: "+args.index)
sys.exit(-1)
return True
else:
return False
def can_connect(args):
try:
host = socket.gethostbyname(args.address)
s = socket.create_connection((host, args.port), 2)
return True
except Exception as e:
log.critical(e)
sys.exit(-99)
def signal_handler(signal, frame):
print("\nWell that's rude! You could've asked nicely...")
sys.exit(0)
def set_log_level(args):
if not args.verbose:
log.setLevel(logging.ERROR)
elif args.verbose == 1:
log.setLevel(logging.WARNING)
elif args.verbose == 2:
log.setLevel(logging.INFO)
elif args.verbose == 3:
log.setLevel(logging.DEBUG)
else:
log.critical("Unknown log level")
def main(arguments):
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-f', '--file', type = argparse.FileType('r'),
required = True, help = "Newline separated PATTERNS input file")
parser.add_argument('-a', '--address', default = 'localhost',
help = "ElasticSearch address")
parser.add_argument('-p', '--port', default = '9200',
help = "ElasticSearch port")
parser.add_argument('-i', '--index', required = True,
help = "ElasticSearch index to query")
parser.add_argument('-s', '--seconds', type = int, default = INTERVAL,
help = "Interval for each search (seconds)")
parser.add_argument('-t', '--testdata', action = "store_true",
help = "Add sample test data")
parser.add_argument('-d', '--delete', action = "store_true",
help = "Delete index")
parser.add_argument('-v', '--verbose', action = "count",
help = "Verbose level")
args = parser.parse_args(arguments)
set_log_level(args)
signal.signal(signal.SIGINT, signal_handler)
can_connect(args)
init_es(args)
if args.delete:
delete_index(args)
sys.exit(0)
if args.testdata:
add_document(args, 123, "Hey, this is a message")
add_document(args, 123, "Message is great")
add_document(args, 789, "Just a string")
sys.exit(0)
try:
target = Thread(target = search_index(args))
target.setDaemon(True)
target.start()
except:
log.critical("Unable to continue search!")
sys.exit(-1)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
################################################################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment