Created
September 3, 2012 15:10
-
-
Save quiver/3609972 to your computer and use it in GitHub Desktop.
Python implementation of "IBM DeveloperWorks : Monitor Linux file system events with inotify"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import ctypes | |
import ctypes.util | |
# bit masks | |
IN_ISDIR = 0x40000000 | |
IN_ALL_EVENTS = 0xfff | |
class inotify_event_struct(ctypes.Structure): | |
""" | |
Structure representation of the inotify_event structure | |
(used in buffer size calculations):: | |
struct inotify_event { | |
__s32 wd; /* watch descriptor */ | |
__u32 mask; /* watch mask */ | |
__u32 cookie; /* cookie to synchronize two events */ | |
__u32 len; /* length (including nulls) of name */ | |
char name[0]; /* stub for possible name */ | |
}; | |
""" | |
_fields_ = [('wd', ctypes.c_int), | |
('mask', ctypes.c_uint32), | |
('cookie', ctypes.c_uint32), | |
('len', ctypes.c_uint32), | |
('name', ctypes.c_char_p)] | |
InotifyEvent = collections.namedtuple('InotifyEvent', ['wd', 'mask', 'cookie', 'len', 'name']) | |
EVENT_SIZE = ctypes.sizeof(inotify_event_struct) | |
EVENT_BUFFER_SIZE = 1024 * (EVENT_SIZE + 16) | |
# wrap for inotify system call | |
libc_name = ctypes.util.find_library('c') | |
libc = ctypes.CDLL(libc_name, use_errno=True) | |
get_errno_func = ctypes.get_errno | |
libc.inotify_init.argtypes = [] | |
libc.inotify_init.restype = ctypes.c_int | |
libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, | |
ctypes.c_uint32] | |
libc.inotify_add_watch.restype = ctypes.c_int | |
libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] | |
libc.inotify_rm_watch.restype = ctypes.c_int | |
inotify_init = libc.inotify_init | |
inotify_add_watch = libc.inotify_add_watch | |
inotify_rm_watch = libc.inotify_rm_watch |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python implementation of the following article | |
IBM DeveloperWorks : Monitor Linux file system events with inotify | |
http://www.ibm.com/developerworks/linux/library/l-inotify/ | |
Usage | |
$ python inotify_test.py /tmp | |
Major differences are : | |
- use epoll instead of select | |
- event mask fields are not checked | |
- error handling is omitted | |
- writtern in Python instead of C :-) | |
Some parts of this program borrow codes from the following programs: | |
- https://github.com/seb-m/pyinotify | |
- https://github.com/gorakhargosh/watchdog | |
""" | |
import os | |
import Queue | |
import select | |
import struct | |
import inotify | |
def open_inotify_fd(): | |
inotify_fd = inotify.inotify_init() | |
return inotify_fd | |
def close_inotify_fd(fd): | |
os.close(fd) | |
def handle_event(event): | |
if event.mask & inotify.IN_ISDIR: | |
print 'handle', 'Dir', event | |
else: | |
print 'handle', 'File', event | |
def handle_events(q): | |
while not q.empty(): | |
event = q.get() | |
handle_event(event) | |
def read_events(q, fd): | |
try: | |
event_buffer = os.read(fd, inotify.EVENT_BUFFER_SIZE) | |
except Exception, msg: | |
raise msg | |
count = 0 | |
buffer_i = 0 | |
fmt = 'iIII' | |
s_size = struct.calcsize(fmt) | |
while buffer_i < len(event_buffer): | |
wd, mask, cookie, fname_len = \ | |
struct.unpack(fmt, event_buffer[buffer_i:buffer_i+s_size]) | |
(fname, ) = struct.unpack('%ds' % fname_len, | |
event_buffer[buffer_i + s_size:buffer_i + s_size + fname_len]) | |
fname = fname.rstrip('\0') # remove trailing pad | |
event = inotify.InotifyEvent(wd, mask, cookie, fname_len, fname) | |
print 'enqueue', event | |
q.put(event) | |
buffer_i += s_size + fname_len | |
count += 1 | |
print "%d events queued" % count | |
def process_inotify_events(q, fd): | |
epoll = select.epoll() | |
epoll.register(fd) | |
while True: | |
try: | |
events = epoll.poll(1) | |
for fileno, event in events: | |
if event & select.EPOLLIN: | |
read_events(q, fd) | |
handle_events(q) | |
except KeyboardInterrupt, err: | |
break | |
def main(): | |
import argparse | |
parser = argparse.ArgumentParser(description='Process some integers.') | |
parser.add_argument('paths', nargs='+', help='path to watch') | |
args = parser.parse_args() | |
inotify_fd = inotify.inotify_init() | |
if inotify_fd > 0: | |
q = Queue.Queue(128) | |
wd = 0 | |
for path in args.paths: | |
wd = inotify.inotify_add_watch(inotify_fd, path, inotify.IN_ALL_EVENTS) | |
if wd < 0: | |
break | |
process_inotify_events(q, inotify_fd) | |
print "termination" | |
close_inotify_fd(inotify_fd) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment