Created
September 5, 2016 05:38
-
-
Save recall704/551926bd9c098a60ce225517baa78a5c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
说明: 采集日志文件 上送至数据库 | |
作者:pengxin.wu [email protected] | |
创建时间: | |
""" | |
import input | |
import output | |
import filter | |
from common.dcoslog import logging | |
import gevent | |
from gevent.lock import BoundedSemaphore | |
import uuid | |
import monitor | |
import threading | |
from globalvar import g_theads, g_file_set | |
import sys | |
import os | |
sys.path.insert(1, os.path.dirname(os.path.abspath(__file__))) | |
# 创建一个信号量 | |
g_sem = BoundedSemaphore(2) | |
log = logging.getLogger("loooooo") | |
log.add_file_handler("error.logcrawler.log", level=logging.ERROR) | |
log.add_stream_handler() | |
log.setLevel(logging.INFO) | |
def worker_read_send(file_rd, es_w): | |
""" | |
读取文件内容 对内容处理后 输出到es | |
:param file_rd: | |
:param es_w: | |
:return: | |
""" | |
g_sem.acquire() | |
setattr(g_theads[file_rd.filename], "name", "read_file_to_es") | |
setattr(g_theads[file_rd.filename], "id", uuid.uuid4().hex) | |
setattr(g_theads[file_rd.filename], "tid", threading.current_thread().ident) | |
g_sem.release() | |
uid = uuid.uuid4().hex | |
tans = filter.Filter(file_rd.config) | |
while True: | |
# log.info("inter uid:%s" % uid) | |
ret_buf = file_rd.gen.next() | |
if not ret_buf: | |
# 如果没有读取到文件内容 让出CPU | |
# log.info("not ret", request_id=uid) | |
gevent.sleep(2) | |
# log.info("%s not find new contant" % file_rd.filename, request_id=uid) | |
continue | |
while True: | |
ret = es_w.send_to_es(tans.trans_contant(ret_buf)) | |
if not ret[0]: | |
# 如果没有发送成功 则再次尝试发送 直到发送成功 才能继续读取文件的内容 | |
log.error("%s" % ret[1], request_id=uid) | |
else: | |
break | |
def monitor_dir_change(config, file_set, theads): | |
""" | |
监视指定文件夹的内容变化 如果有新增内容 需要创建新的协程去读取文件并上送到数据库 如果有删除的文件 需要删除处理这个文件的 | |
worker | |
:param config: 配置文件 | |
:param file_set: 目前已经存在的文件集合 | |
:param theads: 已经启动的线程集合 | |
:return: | |
""" | |
while True: | |
new_file_set = input.list_all_files(config["input"]["path"], config["input"]["matchfile"]) | |
add = new_file_set - file_set | |
delete = file_set - new_file_set | |
if add: | |
# 说明有新增的内容 要为新增的文件 创建协程 | |
log.info("new add %s" % add) | |
for filename in add: | |
infd = input.FileRead(filename, config) | |
outws = output.EsWrite(infd.outconfig) | |
theads[filename] = gevent.spawn(worker_read_send, infd, outws) | |
file_set.add(filename) | |
if delete: | |
# 说明有文件被移除掉 | |
log.info("new delete %s" % delete) | |
for filename in delete: | |
if filename in theads: | |
g_sem.acquire() | |
# 删除此协程 | |
theads[filename].kill() | |
g_sem.release() | |
theads.pop(filename) | |
input.FileRead.delete_fileinfo(filename) | |
file_set.remove(filename) | |
# 让出CPU | |
gevent.sleep(10) | |
if __name__ == "__main__": | |
conf = input.read_conf("./conf.yml") | |
log.info(id(g_file_set)) | |
th = gevent.spawn(monitor_dir_change, conf, g_file_set, g_theads) | |
setattr(th, "name", "monitor file change") | |
setattr(th, "id", uuid.uuid4().hex) | |
setattr(th, "tid", threading.current_thread().ident) | |
#web_server = monitor.start_monitor_task() | |
#setattr(web_server, "name", "web server for monitor") | |
#setattr(web_server, "id", uuid.uuid4().hex) | |
#setattr(web_server, "tid", threading.current_thread().ident) | |
g_sem.acquire() | |
g_theads["monitor_file_change"] = th | |
#g_theads["web_server_for_monitor"] = web_server | |
g_sem.release() | |
gevent.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment