Last active
April 30, 2023 14:02
-
-
Save sebdelsol/089a21babbb25d00be4fc87d813ef540 to your computer and use it in GitHub Desktop.
proxy to search among movies and series in sfvip player
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requirements: | |
# Python >= 3.10 | |
# pip install proxy.py psgtray psutil pysimplegui pywin32 requests | |
# use it: | |
# python sfvip_proxy.py [--port PORT] | |
# or create the launcher: | |
# python sfvip_proxy.py --launcher [--port PORT] | |
# and Launch the proxy with sfvip_proxy.vbs | |
import argparse | |
import json | |
import os | |
import re | |
import subprocess | |
import sys | |
import threading | |
import time | |
from enum import Enum, auto | |
from pathlib import Path | |
from typing import IO, Any, Iterator, List, Optional | |
import psutil | |
import PySimpleGUI as sg | |
import requests | |
import win32api | |
import win32event | |
import winerror | |
from psgtray import SystemTray | |
import proxy | |
from proxy.http.methods import httpMethods | |
from proxy.http.parser import HttpParser | |
from proxy.http.proxy import HttpProxyBasePlugin | |
from proxy.http.responses import okResponse | |
class Config: | |
class AllCat: | |
inject = ("series", "vod") | |
name = "All" | |
id = 0 | |
class Proxy: | |
buf_size_in_MB = 16 | |
log_level = "ERROR" | |
timeout = 30 | |
port = 7777 | |
class UI: | |
font = "Arial 15 bold" | |
theme = "DarkBlue14" | |
element_padding = 3 | |
class Ok: | |
icon = sg.SYSTEM_TRAY_MESSAGE_ICON_INFORMATION | |
color = "lime green" | |
class Warning: | |
icon = sg.SYSTEM_TRAY_MESSAGE_ICON_WARNING | |
color = "red" | |
class Window(sg.Window): | |
_kw = dict(grab_anywhere=True, no_titlebar=True, debugger_enabled=False, keep_on_top=True) | |
_frame_kw = dict(p=0, border_width=1, relief=sg.RELIEF_SOLID, expand_x=True, expand_y=True) | |
def __init__(self, title, layout, **kw) -> None: | |
args = title, [[sg.Frame("", layout, **self._frame_kw)]] | |
kw.update(self._kw) | |
super().__init__(*args, margins=(0, 0), element_padding=Config.UI.element_padding, **kw) | |
super().finalize() | |
for elt in self.element_list(): | |
elt.grab_anywhere_include() | |
if hasattr(elt, "finalize"): | |
elt.finalize() | |
class Button(sg.Button): | |
_binds = "<Enter>", "<Leave>" | |
_kw = dict(border_width=0) | |
def __init__(self, *args, mouseover_color: str, **kw) -> None: | |
self.colors = dict(Enter=mouseover_color, Leave=sg.theme_button_color()) | |
super().__init__(*args, **(kw | self._kw)) | |
def finalize(self) -> None: | |
for bind in self._binds: | |
self.Widget.bind(bind, self._on_mouse_over) | |
self.block_focus(True) | |
def _on_mouse_over(self, event: sg.tk.Event) -> None: | |
self.update(button_color=self.colors.get(event.type.name)) | |
class YesNoWindow(Window): | |
_yes, _no = "Yes", "No" | |
def __init__(self, title) -> None: | |
no_button = Button(self._no, mouseover_color=Config.UI.Ok.color) | |
yes_button = Button(self._yes, mouseover_color=Config.UI.Warning.color) | |
title = sg.T(title, expand_x=True, justification="center") | |
layout = [[title], [no_button, sg.P(), yes_button]] | |
super().__init__("", layout, modal=True) | |
def loop(self) -> None: | |
while True: | |
event, _ = self.read() | |
if event in (self._yes, self._no, sg.WIN_CLOSED): | |
self.close() | |
return event == self._yes | |
class SingleApp: | |
# pylint: disable=c-extension-no-member | |
def __init__(self, name: str) -> None: | |
self._event = win32event.CreateEvent(None, False, False, f"{name}_START_EVENT") | |
self.already_running = win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS | |
win32event.SetEvent(self._event) | |
def wait_for_another_to_start(self): | |
win32event.WaitForSingleObject(self._event, win32event.INFINITE) | |
def __enter__(self): | |
return self | |
def __exit__(self, *_) -> None: | |
win32api.CloseHandle(self._event) | |
class SfvipPlugin(HttpProxyBasePlugin): | |
_api_query = b"player_api.php?" | |
_query_attr = {httpMethods.POST: "body", httpMethods.GET: "path"} | |
_all_category_query = f"&category_id={Config.AllCat.id}".encode() | |
_all_category_json = dict(category_id=str(Config.AllCat.id), category_name=Config.AllCat.name, parent_id=0) | |
_is_categories_query = re.compile(f"get_({'|'.join(Config.AllCat.inject)})_categories".encode()).search | |
def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]: | |
if request.path and self._api_query in request.path: | |
query_attr = self._query_attr[request.method] | |
query: bytes = getattr(request, query_attr) | |
if self._all_category_query in query: | |
# turn an all category query into a whole catalog query | |
setattr(request, query_attr, query.replace(self._all_category_query, b"")) | |
elif self._is_categories_query(query): | |
# send a response with the all category injected | |
url = str(request._url) # pylint: disable=protected-access | |
resp = requests.get(url, params=request.body, timeout=Config.Proxy.timeout) | |
if resp.status_code == 200: | |
self.client.queue( | |
okResponse( | |
headers={b"Content-Type": b"application/json"}, | |
content=json.dumps([self._all_category_json] + resp.json()).encode(), | |
) | |
) | |
return request | |
class SfvipProxy(proxy.Proxy): | |
buf_size = str(Config.Proxy.buf_size_in_MB * 1024**2) | |
proxy_opts = ( | |
*("--timeout", str(Config.Proxy.timeout)), | |
*("--log-level", Config.Proxy.log_level), | |
*("--client-recvbuf-size", buf_size), | |
*("--server-recvbuf-size", buf_size), | |
*("--max-sendbuf-size", buf_size), | |
*("--num-acceptors", "1"), # prevent shutdown lock | |
) | |
def __init__(self, port: int) -> None: | |
super().__init__(self.proxy_opts, port=port, plugins=[SfvipPlugin]) | |
class SfvipUsers: | |
_database = Path(os.getenv("APPDATA")) / "SFVIP-Player" / "Database.json" | |
_playlist_ext = ".m3u", ".m3u8" | |
def _open(self, mode: str) -> IO: | |
return self._database.open(mode=mode, encoding="utf8") | |
def _is_playlist(self, user: dict) -> bool: | |
path = Path(user["Address"]) | |
return path.suffix in self._playlist_ext or path.exists() | |
def set_proxy(self, proxy_url: str) -> None: | |
if self._database.exists(): | |
with self._open("r") as f: | |
users = json.load(f) | |
if users := [user for user in users if not self._is_playlist(user)]: | |
for user in users: | |
user["HttpProxy"] = proxy_url | |
with self._open("w") as f: | |
json.dump(users, f, indent=2, separators=(",", ":")) | |
class SfvipProcess: | |
_name = "sfvip player.exe" | |
_creationflags = subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.CREATE_BREAKAWAY_FROM_JOB | |
def get_running(self, *attrs: List[str]) -> Iterator[psutil.Process]: | |
for process in psutil.process_iter(("name", *attrs)): | |
if self._name == process.info["name"]: | |
yield process | |
def is_running(self) -> bool: | |
return any(True for _ in self.get_running()) | |
def kill(self) -> List[List[str]]: | |
processes = list(self.get_running("cmdline")) | |
for process in processes: | |
process.kill() | |
psutil.wait_procs(processes) | |
return [process.info["cmdline"] for process in processes] | |
def start(self, cmdlines: List[List[str]]) -> None: | |
for cmdline in cmdlines: | |
subprocess.Popen(cmdline, creationflags=self._creationflags) # pylint: disable=consider-using-with | |
class SfvipApp(Window): | |
name = "SF VIP Proxy" | |
class Event(Enum): | |
ANOTHER_APP_HAS_STARTED = auto() | |
WAIT_FOR_PLAYER = auto() | |
PROXY_STARTED = auto() | |
MINIMIZE = auto() | |
CLOSE = auto() | |
_sleep = 1 # second | |
def __init__(self, app: SingleApp, port: int) -> None: | |
sg.theme(Config.UI.theme) | |
sg.set_options(font=Config.UI.font) | |
self.sfvip_process = SfvipProcess() | |
self.sfvip_users = SfvipUsers() | |
self.running = False | |
self.port = port | |
self.app = app | |
self.tray = SystemTray(["", []], window=self, single_click_events=True, icon=Config.UI.Warning.icon) | |
b_minimize = Button("___", mouseover_color=Config.UI.Ok.color, k=self.Event.MINIMIZE) | |
b_quit = Button("X", mouseover_color=Config.UI.Warning.color, k=self.Event.CLOSE) | |
self.proxy_txt = sg.T("Init...", justification="center", expand_x=True) | |
self.img = sg.Image() | |
super().__init__(self.name, [[self.img, sg.T(self.name), self.proxy_txt, b_quit, b_minimize]]) | |
def _send_event(self, event: Any, value: Any = None) -> None: | |
if self.running: | |
self.write_event_value(event, value) | |
def _ask_quit(self) -> bool: | |
self.hide() | |
if YesNoWindow(title=f"Quit {self.name} ?").loop(): | |
return True | |
self.un_hide() | |
return False | |
def _toggle_visibility(self) -> None: | |
if self._Hidden: | |
self.un_hide() | |
else: | |
self.hide() | |
def _set_state(self, txt: str, state: Config.UI.Ok | Config.UI.Warning) -> None: | |
self.proxy_txt(txt, text_color=state.color) | |
self.tray.change_icon(state.icon) | |
self.tray.set_tooltip(txt) | |
self.img(state.icon) | |
def _set_proxy(self, proxy_url: str) -> None: | |
if self.sfvip_process.is_running(): | |
killed = self.sfvip_process.kill() | |
self.sfvip_users.set_proxy(proxy_url) | |
self.sfvip_process.start(killed) | |
else: | |
self.sfvip_users.set_proxy(proxy_url) | |
def _wait_and_run_proxy(self) -> None: | |
proxy_url = f"http://127.0.0.1:{self.port}" | |
self._set_proxy(proxy_url) | |
while self.running: | |
if not self.sfvip_process.is_running(): | |
self._send_event(self.Event.WAIT_FOR_PLAYER) | |
while self.running and not self.sfvip_process.is_running(): | |
time.sleep(self._sleep) | |
if self.running: | |
self._send_event(self.Event.PROXY_STARTED, proxy_url) | |
with SfvipProxy(self.port): | |
while self.running and self.sfvip_process.is_running(): | |
time.sleep(self._sleep) | |
self._set_proxy("") | |
def _wait_for_another_app_to_start(self): | |
while self.running: | |
self.app.wait_for_another_to_start() | |
self._send_event(self.Event.ANOTHER_APP_HAS_STARTED) | |
def loop(self) -> None: | |
self.running = True | |
threading.Thread(target=self._wait_and_run_proxy).start() | |
threading.Thread(target=self._wait_for_another_app_to_start, daemon=True).start() | |
while self.running: | |
event, values = self.read() | |
match event: | |
case self.Event.WAIT_FOR_PLAYER: | |
self._set_state("wait for player", Config.UI.Warning) | |
case self.Event.PROXY_STARTED: | |
self._set_state(f"started @ {values[event]}", Config.UI.Ok) | |
case self.Event.ANOTHER_APP_HAS_STARTED: | |
self.un_hide() | |
case self.tray.key: | |
self._toggle_visibility() | |
case self.Event.MINIMIZE: | |
self.hide() | |
case self.Event.CLOSE: | |
if self._ask_quit(): | |
break | |
case sg.WIN_CLOSED: | |
break | |
self.running = False | |
self.tray.close() | |
self.close() | |
def get_options() -> argparse.Namespace: | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--port", type=int, default=Config.Proxy.port, help="Proxy port") | |
parser.add_argument("--launcher", action="store_true", help="Create the launcher") | |
return parser.parse_args() | |
def create_vbs_launcher(port: int) -> None: | |
python_path = Path(sys.executable).parent / "pythonw.exe" | |
script_path = Path(__file__) | |
code = ( | |
'Set WshShell = CreateObject("WScript.Shell")', | |
f'WshShell.Run """{python_path.resolve()}"" ""{script_path.resolve()}"" --port {port}"', | |
) | |
script_path.with_suffix(".vbs").write_text("\n".join(code), encoding="utf-8") | |
if __name__ == "__main__": | |
options = get_options() | |
if options.launcher: | |
create_vbs_launcher(options.port) | |
else: | |
with SingleApp(SfvipApp.name) as single_app: | |
if not single_app.already_running: | |
SfvipApp(single_app, options.port).loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment