Last active
November 17, 2023 21:12
-
-
Save SF-300/b5a279b28b94ef9add846de33c036cd7 to your computer and use it in GitHub Desktop.
async reading of win32 named pipe containing komorebi window manager events
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE(zeronineseven): Heavily based on https://gist.github.com/denBot/4136279812f87819f86d99eba77c1ee0 | |
import asyncio | |
import contextlib | |
import json | |
from contextlib import AsyncExitStack | |
from pathlib import Path | |
from typing import AsyncContextManager, Protocol, AsyncIterator, Never | |
import win32event | |
import win32pipe | |
import win32file | |
import pywintypes | |
class _PyHandle(Protocol): | |
handle: int | |
def Close(self): | |
... | |
class _PyHandleWrapper: | |
def __init__(self, handle: _PyHandle) -> None: | |
self.handle = handle | |
def fileno(self) -> int: | |
return self.handle.handle | |
def close(self) -> None: | |
self.handle.Close() | |
@contextlib.asynccontextmanager | |
async def _events_stream_alive( | |
pipe_name: str = "pykomorebi", | |
komorebic_executable: Path = Path(r"C:\Program Files\komorebi\bin\komorebic.exe"), | |
) -> AsyncContextManager[AsyncIterator[dict]]: | |
bufsize = 65536 | |
async with AsyncExitStack() as deffer: | |
pipe = win32pipe.CreateNamedPipe(*dict( | |
lpName=rf"\\.\pipe\{pipe_name}", | |
dwOpenMode=win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED, | |
dwPipeMode=win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE, | |
nMaxInstances=1, | |
nOutBufferSize=bufsize, | |
nInBufferSize=bufsize, | |
nDefaultTimeOut=0, | |
lpSecurityAttributes=None, | |
).values()) | |
registration = await asyncio.create_subprocess_exec(komorebic_executable, "subscribe", pipe_name) | |
if (await registration.wait()) != 0: | |
raise RuntimeError("Failed to register events pipe in komorebi!") | |
overlapped = pywintypes.OVERLAPPED() | |
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) | |
win32pipe.ConnectNamedPipe(*dict( | |
hNamedPipe=pipe, | |
lpOverlapped=overlapped, | |
).values()) | |
deffer.callback(win32file.CloseHandle, pipe) | |
reader = asyncio.StreamReader() | |
transport, protocol = await asyncio.get_running_loop().connect_read_pipe( | |
lambda: asyncio.StreamReaderProtocol(reader), _PyHandleWrapper(pipe) | |
) | |
deffer.callback(transport.close) | |
async def events(): | |
while True: | |
line = await reader.readline() | |
if not line: | |
assert protocol.eof_received() and transport.is_closing() | |
raise IOError("Connection closed") | |
event = json.loads(line.decode("utf-8")) | |
yield event | |
yield events() | |
async def _main() -> Never: | |
async with _events_stream_alive() as events: | |
async for event in events: | |
print(event) | |
asyncio.run(_main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment