Skip to content

Instantly share code, notes, and snippets.

@tamtam180
Created October 19, 2012 07:04
Show Gist options
  • Save tamtam180/3916648 to your computer and use it in GitHub Desktop.
Save tamtam180/3916648 to your computer and use it in GitHub Desktop.
fluenpyをwindowsで動かすためのin_tail (後で消します)
# coding: utf-8
"""
fluenpy.plugins.in_tail
~~~~~~~~~~~~~~~~~~~~~~~~
tail from rotated log.
:copyright: (c) 2012 by INADA Naoki
:license: Apache v2
"""
from __future__ import print_function, division, absolute_import, with_statement
import logging
import os
import errno
import time
import struct
import binascii
import pywintypes
import win32api
import win32file
import winioctlcon
import win32con
import gevent
from fluenpy.plugin import Plugin
from fluenpy.input import Input
from fluenpy.engine import Engine
from fluenpy.config import config_param
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
class TailInput(Input):
path = tag = key = config_param('string')
rotate_wait = config_param('time', default=5)
object_id_format = '16s16s16s16s'
def start(self):
self._shutdown = False
self._proc = gevent.spawn(self.run)
@property
def paths(self):
return self._paths
def configure(self, conf):
super(TailInput, self).configure(conf)
self._path = self.path.strip()
def emit(self, line):
Engine.emit(self.tag, int(time.time()), {self.key: line})
def run(self):
objectid_buffer = win32file.AllocateReadBuffer(struct.calcsize(self.object_id_format))
read_buffer = win32file.AllocateReadBuffer(1024**2)
last_path = None
last_object_id = None
path = self._path
handle = None
before = b''
try:
while not self._shutdown:
if handle is None:
try:
handle = self.open(path) # handleが不正な場合は例外が飛ぶのでチェック不要
last_object_id = self.get_object_id(handle, objectid_buffer)
last_path = self.get_final_path(handle, True)
log.info("opened log file: %s, object_id=%s", path, last_object_id)
except pywintypes.error as err:
# 開いたことに成功しても、この一連の流れで失敗するという事は、
# 開いた瞬間に削除された事になるので開けなかった事にする処理を入れておく。
self.close(handle)
handle = None
self.record_win32error(err)
gevent.sleep(self.rotate_wait)
continue
buf = self.readsome(handle, read_buffer)
if not buf:
if self.is_current(handle, last_path):
gevent.sleep(0.3)
continue
else:
gevent.sleep(self.rotate_wait)
buf = self.readsome(handle, read_buffer)
if not buf:
self.close(handle)
handle = None
log.info("closed log file: %s", path)
continue
if b'\n' not in buf:
before += buf
continue
buf = before + buf
before = b''
lines = buf.splitlines(True)
if not lines[-1].endswith(b'\n'):
before = lines.pop()
for line in lines:
self.emit(line.rstrip())
finally:
self.close(handle)
def is_current(self, handle, last_path):
filepath = self.get_final_path(handle)
if filepath is None:
return False
return filepath == last_path
def get_final_path(self, handle, raise_error=False):
try:
last_path = win32file.GetFinalPathNameByHandle(handle, win32con.FILE_NAME_NORMALIZED)
except pywintypes.error as err:
self.record_win32error(err)
if raise_error:
raise err
last_path = None
return last_path
def open(self, path):
handle = win32file.CreateFileW(path, win32file.GENERIC_READ,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE,
None, win32file.OPEN_EXISTING, 0, 0)
if handle == win32file.INVALID_HANDLE_VALUE:
raise pywintypes.error(win32api.GetLastError(), 'CreateFileW', 'Invalid Handle')
return handle
def close(self, handle):
if handle is not None:
win32file.CloseHandle(handle)
def get_object_id(self, handle, buffer=None):
if buffer is None: # Bufferを渡せない場合はバッファサイズを渡すと内部で作ってくれるので。
buffer = 64 # 64はFILE_OBJECT_BUFFERのサイズで固定。
buffer = win32file.DeviceIoControl(
handle,
winioctlcon.FSCTL_CREATE_OR_GET_OBJECT_ID, # GET_OBJECT_IDだとObjectIDがまだ存在しない場合がある
None,
buffer,
None
)
object_id, birth_volume_id, birth_object_id, domain_id = struct.unpack(self.object_id_format, buffer)
guid = binascii.hexlify(object_id)
return guid # この値はfsutilで取れる値と同じ。
def record_win32error(self, err):
log.warn("win32api='%s', error_code=%s : %s", err[1], err[0], err[2])
def readsome(self, handle, buffer):
ret, str = win32file.ReadFile(handle, buffer, None)
return str
Plugin.register_input('tail', TailInput)
@tamtam180
Copy link
Author

posはまだ記録してない。
outputのsecondaryが未実装なのでどーしようかなと・・。
あとreadしたposを保持しても、予期せぬタイミングで落ちたらbufferの中身が消えるから、どーしたもんかな。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment