Created
May 28, 2015 17:40
-
-
Save fpom/92a690a8cf89cebd7d4a to your computer and use it in GitHub Desktop.
Watch file system updates with gevent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os, os.path, collections | |
| import gevent, gevent.pool, gevent.queue | |
| import platform | |
| if platform.system() == "Windows" : | |
| # see https://github.com/gevent/gevent/issues/459 | |
| import socket | |
| event = collections.namedtuple("event", ["name", "path", "isdir"]) | |
| class DirWatcher (object) : | |
| def __init__ (self, root) : | |
| self.root = os.path.abspath(root) | |
| self.pool = gevent.pool.Pool() | |
| self.q = gevent.queue.Queue() | |
| self.w = {} | |
| self.add(root, "crawl") | |
| def get (self) : | |
| return self.q.get() | |
| def add (self, path, evt="create") : | |
| if os.path.isdir(path) : | |
| for name in os.listdir(path) : | |
| self.add(os.path.join(path, name), evt) | |
| self.pool.spawn(self.watch, path) | |
| self.q.put(event(evt, path, os.path.isdir(path))) | |
| def watch (self, path) : | |
| hub = gevent.get_hub() | |
| watcher = hub.loop.stat(path, 1) | |
| self.w[path] = watcher | |
| isdir = os.path.isdir(path) | |
| if isdir : | |
| old = set(os.listdir(path)) | |
| while path in self.w : | |
| try : | |
| with gevent.Timeout(2) : | |
| hub.wait(watcher) | |
| except : | |
| continue | |
| if os.path.isdir(path) : | |
| new = set(os.listdir(path)) | |
| for name in new - old : | |
| self.add(os.path.join(path, name)) | |
| old = new | |
| elif os.path.exists(path) : | |
| self.q.put(event("update", path, isdir)) | |
| else : | |
| break | |
| if isdir : | |
| for name in old : | |
| self.w.pop(os.path.join(path, name), None) | |
| self.w.pop(path, None) | |
| self.q.put(event("delete", path, isdir)) | |
| if __name__ == "__main__" : | |
| if not os.path.isdir(",test") : | |
| os.mkdir(",test") | |
| d = DirWatcher(",test") | |
| while True : | |
| print "{event.name} {event.isdir} {event.path}".format(event=d.get()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment