Created
May 14, 2022 12:01
-
-
Save justfoolingaround/76e221a9a6bee89216bacb184b67726f to your computer and use it in GitHub Desktop.
Windows MPV IPC Connection Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import random | |
import sys | |
import threading | |
import time | |
from multiprocessing.connection import PipeConnection | |
import _winapi | |
import ujson | |
class VincibleThread(threading.Thread): | |
kill_state = threading.Event() | |
def run(self): | |
sys.settrace(self.global_trace) | |
return super().run() | |
def global_trace(self, stack_frame, reason, *args, **kwargs): | |
if reason == "call": | |
return self.local_trace | |
def local_trace(self, stack_frame, reason, *args, **kwargs): | |
if self.kill_state.is_set() and reason == "line": | |
raise SystemExit() | |
return self.local_trace | |
def kill(self): | |
return self.kill_state.set() | |
class WindowsMPVSocket(object): | |
""" | |
mpv JSON-IPC socket with recieving hooks and communication support. | |
:param ipc_path: path to mpv's IPC socket. | |
""" | |
def __init__(self, ipc_path): | |
self.ipc_path = ipc_path | |
self.requests = {} | |
try: | |
pipe_handle = _winapi.CreateFile( | |
self.ipc_path, | |
_winapi.GENERIC_READ | _winapi.GENERIC_WRITE, | |
_winapi.NULL, | |
_winapi.NULL, | |
_winapi.OPEN_EXISTING, | |
_winapi.FILE_FLAG_OVERLAPPED, | |
_winapi.NULL, | |
) | |
except FileNotFoundError: | |
raise FileNotFoundError( | |
"Failed to create handler for the IPC socket. " | |
"Make sure mpv's IPC socket is accessible." | |
) | |
self.socket = PipeConnection(pipe_handle) | |
self.recieving_threads: "list[VincibleThread]" = [] | |
def send(self, command): | |
try: | |
self.socket.send_bytes(ujson.dumps(command).encode("utf-8") + b"\n") | |
except OSError: | |
raise BrokenPipeError(*OSError.args) | |
def communicate(self, command, *, timeout=5.0): | |
if "request_id" not in command: | |
request_id = random.randrange(0, 2**32) | |
command.update({"request_id": request_id}) | |
else: | |
request_id = command["request_id"] | |
self.send(command) | |
until = time.time() + timeout | |
reciever = None | |
if all(thread.kill_state.is_set() for thread in self.recieving_threads): | |
reciever = self.on_receive(lambda _: None) | |
while time.time() < until and request_id not in self.requests: | |
pass | |
if reciever is not None: | |
reciever.kill() | |
return self.requests.pop(request_id, {}) | |
def recv(self): | |
try: | |
retval = b"".join(iter(lambda: self.socket.recv_bytes(1), b"\n")).strip() | |
except EOFError: | |
raise BrokenPipeError( | |
"Cannot read bytes from socket, possibly due to process unavailability." | |
) | |
if retval: | |
retval = ujson.loads(retval.decode("utf-8")) | |
else: | |
retval = {} | |
if "request_id" in retval: | |
self.requests[retval["request_id"]] = retval | |
return retval | |
def __iter__(self): | |
yield from iter(self.recv, {}) | |
def on_receive(self, callback): | |
def reciever_hook(): | |
main_thread = threading.main_thread() | |
current_thread: VincibleThread = threading.current_thread() | |
for data in self: | |
if not main_thread.is_alive(): | |
current_thread.kill() | |
callback(data) | |
thread = VincibleThread(target=reciever_hook) | |
thread.start() | |
self.recieving_threads.append(thread) | |
return thread | |
def close(self): | |
for thread in self.recieving_threads: | |
if not thread.kill_state.is_set(): | |
thread.kill() | |
with contextlib.suppress(): | |
self.socket.close() | |
if __name__ == "__main__": | |
import shlex | |
socket = WindowsMPVSocket("\\\\.\\pipe\\mpvsocket") | |
socket.on_receive(print) | |
while 1: | |
print( | |
"[*] {}".format(socket.communicate({"command": shlex.split(input("> "))})) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment