Created
May 4, 2024 05:30
-
-
Save remoharsono/484261a4da1854296e248cf36c9d1ea5 to your computer and use it in GitHub Desktop.
Using inotify to watch a folder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inotify.adapters | |
def _main(): | |
i = inotify.adapters.Inotify() | |
i.add_watch('tmp') | |
#with open('/tmp/test_file', 'w'): | |
# pass | |
for event in i.event_gen(yield_nones=False): | |
(_, type_names, path, filename) = event | |
print("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format( | |
path, filename, type_names)) | |
if __name__ == '__main__': | |
_main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from inotifyrecursive import INotify, flags | |
import mimetypes | |
import magic | |
inotify = INotify() | |
# watch_flags = flags.CREATE | flags.DELETE | flags.MODIFY | flags.DELETE_SELF | |
watch_flags = flags.CREATE | |
wd = inotify.add_watch_recursive('tmp', watch_flags) | |
for event in inotify.read(): | |
# get file name | |
filename = str(event.name) | |
print(filename) | |
filepath = "tmp/" + filename | |
# check if it is a file or directory | |
isfile = os.path.isfile(filepath) | |
if isfile: | |
print("Target is a file") | |
mt = mimetypes.guess_type(filepath) | |
mimetype = mt[0] | |
print(mimetype) | |
# or | |
mime = magic.from_file(filepath) | |
print(mime) | |
else: | |
print("Target is not a file") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from inotify_simple import INotify, flags | |
import mimetypes | |
import magic | |
inotify = INotify() | |
# watch_flags = flags.CREATE | flags.DELETE | flags.MODIFY | flags.DELETE_SELF | |
watch_flags = flags.CREATE | |
wd = inotify.add_watch('tmp', watch_flags) | |
for event in inotify.read(): | |
# get file name | |
filename = str(event.name) | |
print(filename) | |
filepath = "tmp/" + filename | |
# check if it is a file or directory | |
isfile = os.path.isfile(filepath) | |
if isfile: | |
print("Target is a file") | |
mt = mimetypes.guess_type(filepath) | |
mimetype = mt[0] | |
print(mimetype) | |
# or | |
mime = magic.from_file(filepath) | |
print(mime) | |
else: | |
print("Target is not a file") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
############################################################################################ | |
# TR Murphy | |
# notifyit.py | |
# | |
# uses the kernel inotify facility. It monitors a directory for the creation of a new file. | |
# for more information on this facility READ: https://lwn.net/Articles/760714/ | |
# | |
# NOTE: because this uses kernel facilities, it will not work on mounted | |
# filesystems. | |
############################################################################################ | |
import os | |
import sys | |
import time | |
import pyinotify | |
######################################## | |
# Define the directory to monitor | |
######################################## | |
directory_to_watch = '/some/directory/path/' | |
# Create a subclass of ProcessEvent to handle events | |
class EventHandler(pyinotify.ProcessEvent): | |
def process_IN_CLOSE_WRITE(self, event): | |
################################################################## | |
# DO STUFF HERE | |
# This method will be triggered when a file is closed after writing | |
# this would be the place to add your own functionality. | |
################################################################## | |
print("New file created:", event.pathname) | |
# Initialize the WatchManager and EventHandler | |
wm = pyinotify.WatchManager() | |
handler = EventHandler() | |
# Define the events to monitor | |
mask = pyinotify.IN_CLOSE_WRITE | |
# Add the directory to the watch list | |
try: | |
wm.add_watch(directory_to_watch, mask, rec=True) | |
except e : | |
print(e) | |
# Initialize the Notifier with the WatchManager and EventHandler | |
notifier = pyinotify.Notifier(wm, handler) | |
# Start the event loop | |
while True: | |
try: | |
# Process any events and update the status | |
notifier.process_events() | |
if notifier.check_events(): | |
notifier.read_events() | |
except KeyboardInterrupt: | |
# Terminate the event loop when interrupted | |
notifier.stop() | |
break | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment