Created
December 8, 2016 14:02
-
-
Save Doooooo0o/e876759409d8358f195ace40dedf2899 to your computer and use it in GitHub Desktop.
flume log finder and config generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os, time, re, socket, getopt, sys | |
from jinja2 import Template | |
logfile=re.compile(ur'.*\.log$', re.IGNORECASE) | |
def finder(path, brokers): | |
paths={} | |
here=os.getcwd() | |
for dpath, dnames, fnames in os.walk(path): | |
for i, fname in enumerate([os.path.join(dpath, fname) for fname in fnames]): | |
if logfile.search(fname) and int(time.time()) - int(os.path.getmtime(fname)) <= 86400: | |
t=socket.gethostname().split('.')[0]+'/'+fname.strip('./') | |
t=t.replace('/', '-').replace('.', '-') | |
paths[t]=fname.replace('./', "/") | |
return paths, brokers | |
def agent(paths, brokers): | |
flume_brokers=brokers | |
here=os.getcwd() | |
readers=1 | |
i=1 | |
tails=[] | |
r='' | |
s='' | |
c='' | |
t=Template('''## channel configuration | |
a1.channels.c{{ reader }}.type = memory | |
a1.channels.c{{ reader }}.capacity = 100 | |
a1.channels.c{{ reader }}.transactionCapacity = 100 | |
# source | |
a1.sources.r{{ reader }}.type = exec | |
a1.sources.r{{ reader }}.command = tail -F {{ log }} | |
a1.sources.r{{ reader }}.channels = c{{ reader }} | |
# sink | |
a1.sinks.k{{ reader }}.type = org.apache.flume.sink.kafka.KafkaSink | |
a1.sinks.k{{ reader }}.topic = logs-{{ log_sanitized }} | |
a1.sinks.k{{ reader }}.brokerList = {{ flume_brokers }} | |
a1.sinks.k{{ reader }}.requiredAcks = 1 | |
a1.sinks.k{{ reader }}.batchSize = 20 | |
a1.sources.r{{ reader }}.channels = c{{ reader }} | |
a1.sinks.k{{ reader }}.channel = c{{ reader }} | |
''') | |
for topic, path in paths.iteritems(): | |
tails.append(t.render(reader=readers, log=path, log_sanitized=topic, flume_brokers=flume_brokers)) | |
r=r+' r%i' % readers # counting the number of sources to append in file | |
s=s+' k%i' % readers # counting the number of sinks | |
c=c+' c%i' % readers # counting the number of sinks | |
readers=readers+1 | |
header=Template('''# Name the components on this agent | |
a1.sources = {{ sources }} | |
a1.sinks = {{ sinks }} | |
a1.channels = {{ channels }} | |
''') | |
print header.render(sources=r, sinks=s, channels=c) | |
for i in tails: | |
print i | |
def usage(): | |
print """ | |
PURPOSE : | |
Finds logs and configure flume to tail them and ship them to kafka brokers. | |
USAGE : | |
logfinder -b brokers -p logpath | |
OPTIONS : | |
-short, --long <value> description | |
-b, --brokers= <brokers> comma separated list of kafka brokers addresses and ports, e.g. : 127.0.0.1:9092,127.0.0.2:9092 | |
-p, --path= <logs directory> where to find the logs that you which to tail | |
-h, --help prints help | |
""" | |
if __name__ == '__main__': | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], 'b:p:h', ['brokers=', 'path=', 'help']) | |
if len(opts) <= 1: | |
usage() | |
sys.exit(2) | |
except getopt.GetoptError: | |
usage() | |
sys.exit(2) | |
for opt, arg in opts: | |
if opt in ('-h', '--help'): | |
usage() | |
sys.exit(2) | |
elif opt in ('-b', '--brokers'): | |
brokers = arg | |
elif opt in ('-p', '--path'): | |
path = arg | |
else: | |
usage() | |
sys.exit(2) | |
path, brokers=finder(path, brokers) | |
agent(path, brokers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment