Skip to content

Instantly share code, notes, and snippets.

@recall704
Created September 5, 2016 05:38
Show Gist options
  • Save recall704/551926bd9c098a60ce225517baa78a5c to your computer and use it in GitHub Desktop.
Save recall704/551926bd9c098a60ce225517baa78a5c to your computer and use it in GitHub Desktop.
# coding=utf-8
"""
说明: 采集日志文件 上送至数据库
作者:pengxin.wu [email protected]
创建时间:
"""
import input
import output
import filter
from common.dcoslog import logging
import gevent
from gevent.lock import BoundedSemaphore
import uuid
import monitor
import threading
from globalvar import g_theads, g_file_set
import sys
import os
sys.path.insert(1, os.path.dirname(os.path.abspath(__file__)))
# 创建一个信号量
g_sem = BoundedSemaphore(2)
log = logging.getLogger("loooooo")
log.add_file_handler("error.logcrawler.log", level=logging.ERROR)
log.add_stream_handler()
log.setLevel(logging.INFO)
def worker_read_send(file_rd, es_w):
"""
读取文件内容 对内容处理后 输出到es
:param file_rd:
:param es_w:
:return:
"""
g_sem.acquire()
setattr(g_theads[file_rd.filename], "name", "read_file_to_es")
setattr(g_theads[file_rd.filename], "id", uuid.uuid4().hex)
setattr(g_theads[file_rd.filename], "tid", threading.current_thread().ident)
g_sem.release()
uid = uuid.uuid4().hex
tans = filter.Filter(file_rd.config)
while True:
# log.info("inter uid:%s" % uid)
ret_buf = file_rd.gen.next()
if not ret_buf:
# 如果没有读取到文件内容 让出CPU
# log.info("not ret", request_id=uid)
gevent.sleep(2)
# log.info("%s not find new contant" % file_rd.filename, request_id=uid)
continue
while True:
ret = es_w.send_to_es(tans.trans_contant(ret_buf))
if not ret[0]:
# 如果没有发送成功 则再次尝试发送 直到发送成功 才能继续读取文件的内容
log.error("%s" % ret[1], request_id=uid)
else:
break
def monitor_dir_change(config, file_set, theads):
"""
监视指定文件夹的内容变化 如果有新增内容 需要创建新的协程去读取文件并上送到数据库 如果有删除的文件 需要删除处理这个文件的
worker
:param config: 配置文件
:param file_set: 目前已经存在的文件集合
:param theads: 已经启动的线程集合
:return:
"""
while True:
new_file_set = input.list_all_files(config["input"]["path"], config["input"]["matchfile"])
add = new_file_set - file_set
delete = file_set - new_file_set
if add:
# 说明有新增的内容 要为新增的文件 创建协程
log.info("new add %s" % add)
for filename in add:
infd = input.FileRead(filename, config)
outws = output.EsWrite(infd.outconfig)
theads[filename] = gevent.spawn(worker_read_send, infd, outws)
file_set.add(filename)
if delete:
# 说明有文件被移除掉
log.info("new delete %s" % delete)
for filename in delete:
if filename in theads:
g_sem.acquire()
# 删除此协程
theads[filename].kill()
g_sem.release()
theads.pop(filename)
input.FileRead.delete_fileinfo(filename)
file_set.remove(filename)
# 让出CPU
gevent.sleep(10)
if __name__ == "__main__":
conf = input.read_conf("./conf.yml")
log.info(id(g_file_set))
th = gevent.spawn(monitor_dir_change, conf, g_file_set, g_theads)
setattr(th, "name", "monitor file change")
setattr(th, "id", uuid.uuid4().hex)
setattr(th, "tid", threading.current_thread().ident)
#web_server = monitor.start_monitor_task()
#setattr(web_server, "name", "web server for monitor")
#setattr(web_server, "id", uuid.uuid4().hex)
#setattr(web_server, "tid", threading.current_thread().ident)
g_sem.acquire()
g_theads["monitor_file_change"] = th
#g_theads["web_server_for_monitor"] = web_server
g_sem.release()
gevent.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment