Created
October 19, 2012 07:04
-
-
Save tamtam180/3916648 to your computer and use it in GitHub Desktop.
fluenpyをwindowsで動かすためのin_tail (後で消します)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
fluenpy.plugins.in_tail | |
~~~~~~~~~~~~~~~~~~~~~~~~ | |
tail from rotated log. | |
:copyright: (c) 2012 by INADA Naoki | |
:license: Apache v2 | |
""" | |
from __future__ import print_function, division, absolute_import, with_statement | |
import logging | |
import os | |
import errno | |
import time | |
import struct | |
import binascii | |
import pywintypes | |
import win32api | |
import win32file | |
import winioctlcon | |
import win32con | |
import gevent | |
from fluenpy.plugin import Plugin | |
from fluenpy.input import Input | |
from fluenpy.engine import Engine | |
from fluenpy.config import config_param | |
log = logging.getLogger(__name__) | |
log.setLevel(logging.DEBUG) | |
class TailInput(Input): | |
path = tag = key = config_param('string') | |
rotate_wait = config_param('time', default=5) | |
object_id_format = '16s16s16s16s' | |
def start(self): | |
self._shutdown = False | |
self._proc = gevent.spawn(self.run) | |
@property | |
def paths(self): | |
return self._paths | |
def configure(self, conf): | |
super(TailInput, self).configure(conf) | |
self._path = self.path.strip() | |
def emit(self, line): | |
Engine.emit(self.tag, int(time.time()), {self.key: line}) | |
def run(self): | |
objectid_buffer = win32file.AllocateReadBuffer(struct.calcsize(self.object_id_format)) | |
read_buffer = win32file.AllocateReadBuffer(1024**2) | |
last_path = None | |
last_object_id = None | |
path = self._path | |
handle = None | |
before = b'' | |
try: | |
while not self._shutdown: | |
if handle is None: | |
try: | |
handle = self.open(path) # handleが不正な場合は例外が飛ぶのでチェック不要 | |
last_object_id = self.get_object_id(handle, objectid_buffer) | |
last_path = self.get_final_path(handle, True) | |
log.info("opened log file: %s, object_id=%s", path, last_object_id) | |
except pywintypes.error as err: | |
# 開いたことに成功しても、この一連の流れで失敗するという事は、 | |
# 開いた瞬間に削除された事になるので開けなかった事にする処理を入れておく。 | |
self.close(handle) | |
handle = None | |
self.record_win32error(err) | |
gevent.sleep(self.rotate_wait) | |
continue | |
buf = self.readsome(handle, read_buffer) | |
if not buf: | |
if self.is_current(handle, last_path): | |
gevent.sleep(0.3) | |
continue | |
else: | |
gevent.sleep(self.rotate_wait) | |
buf = self.readsome(handle, read_buffer) | |
if not buf: | |
self.close(handle) | |
handle = None | |
log.info("closed log file: %s", path) | |
continue | |
if b'\n' not in buf: | |
before += buf | |
continue | |
buf = before + buf | |
before = b'' | |
lines = buf.splitlines(True) | |
if not lines[-1].endswith(b'\n'): | |
before = lines.pop() | |
for line in lines: | |
self.emit(line.rstrip()) | |
finally: | |
self.close(handle) | |
def is_current(self, handle, last_path): | |
filepath = self.get_final_path(handle) | |
if filepath is None: | |
return False | |
return filepath == last_path | |
def get_final_path(self, handle, raise_error=False): | |
try: | |
last_path = win32file.GetFinalPathNameByHandle(handle, win32con.FILE_NAME_NORMALIZED) | |
except pywintypes.error as err: | |
self.record_win32error(err) | |
if raise_error: | |
raise err | |
last_path = None | |
return last_path | |
def open(self, path): | |
handle = win32file.CreateFileW(path, win32file.GENERIC_READ, | |
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE, | |
None, win32file.OPEN_EXISTING, 0, 0) | |
if handle == win32file.INVALID_HANDLE_VALUE: | |
raise pywintypes.error(win32api.GetLastError(), 'CreateFileW', 'Invalid Handle') | |
return handle | |
def close(self, handle): | |
if handle is not None: | |
win32file.CloseHandle(handle) | |
def get_object_id(self, handle, buffer=None): | |
if buffer is None: # Bufferを渡せない場合はバッファサイズを渡すと内部で作ってくれるので。 | |
buffer = 64 # 64はFILE_OBJECT_BUFFERのサイズで固定。 | |
buffer = win32file.DeviceIoControl( | |
handle, | |
winioctlcon.FSCTL_CREATE_OR_GET_OBJECT_ID, # GET_OBJECT_IDだとObjectIDがまだ存在しない場合がある | |
None, | |
buffer, | |
None | |
) | |
object_id, birth_volume_id, birth_object_id, domain_id = struct.unpack(self.object_id_format, buffer) | |
guid = binascii.hexlify(object_id) | |
return guid # この値はfsutilで取れる値と同じ。 | |
def record_win32error(self, err): | |
log.warn("win32api='%s', error_code=%s : %s", err[1], err[0], err[2]) | |
def readsome(self, handle, buffer): | |
ret, str = win32file.ReadFile(handle, buffer, None) | |
return str | |
Plugin.register_input('tail', TailInput) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
posはまだ記録してない。
outputのsecondaryが未実装なのでどーしようかなと・・。
あとreadしたposを保持しても、予期せぬタイミングで落ちたらbufferの中身が消えるから、どーしたもんかな。