Skip to content

Instantly share code, notes, and snippets.

@simryang
Forked from mivade/aiowatch.py
Created June 24, 2024 07:59
Show Gist options
  • Save simryang/628006dffb5799dd096ba652df28986f to your computer and use it in GitHub Desktop.
Save simryang/628006dffb5799dd096ba652df28986f to your computer and use it in GitHub Desktop.
Using watchdog with asyncio
import asyncio
from pathlib import Path
from typing import Optional
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
class _EventHandler(FileSystemEventHandler):
def __init__(self, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
*args, **kwargs):
self._loop = loop
self._queue = queue
super(*args, **kwargs)
def on_created(self, event: FileSystemEvent) -> None:
self._loop.call_soon_threadsafe(self._queue.put_nowait, event)
class EventIterator(object):
def __init__(self, queue: asyncio.Queue,
loop: Optional[asyncio.BaseEventLoop] = None):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
item = await self.queue.get()
if item is None:
raise StopAsyncIteration
return item
def watch(path: Path, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
recursive: bool = False) -> None:
"""Watch a directory for changes."""
handler = _EventHandler(queue, loop)
observer = Observer()
observer.schedule(handler, str(path), recursive=recursive)
observer.start()
print("Observer started")
observer.join(10)
loop.call_soon_threadsafe(queue.put_nowait, None)
async def consume(queue: asyncio.Queue) -> None:
async for event in EventIterator(queue):
print("Got an event!", event)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
futures = [
loop.run_in_executor(None, watch, Path("."), queue, loop, False),
consume(queue),
]
loop.run_until_complete(asyncio.gather(*futures))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment