Last active
May 3, 2023 13:30
-
-
Save sebdelsol/59a0e90587faba3aa3614ef6832009a9 to your computer and use it in GitHub Desktop.
Add an all category in vod and series
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requirements: | |
# Python >= 3.10 | |
# pip install proxy.py pywin32 requests | |
# create a shortcut : | |
# python sfvip_proxy.py --shorcut | |
# use it ! | |
import argparse | |
import itertools | |
import json | |
import os | |
import re | |
import subprocess | |
import sys | |
import tkinter as tk | |
import winreg | |
from contextlib import contextmanager, suppress | |
from pathlib import Path | |
from tkinter import filedialog, messagebox | |
from typing import IO, Any, Optional | |
import requests | |
import win32com.client | |
import proxy | |
from proxy.http.methods import httpMethods | |
from proxy.http.parser import HttpParser | |
from proxy.http.proxy import HttpProxyBasePlugin | |
from proxy.http.responses import okResponse | |
class RegKey: | |
"""find stuff in the registry""" | |
@staticmethod | |
def name_by_value(hkey: winreg.HKEYType, path: str, searched_value: Any) -> Optional[str]: | |
with suppress(WindowsError), winreg.OpenKey(hkey, path) as key: | |
for i in itertools.count(): | |
name, value, _ = winreg.EnumValue(key, i) | |
if value == searched_value: | |
return name | |
return None | |
@staticmethod | |
def value_by_name(hkey: winreg.HKEYType, path: str, name: str) -> Optional[Any]: | |
with suppress(WindowsError, FileNotFoundError), winreg.OpenKey(hkey, path) as k: | |
value, _ = winreg.QueryValueEx(k, name) | |
return value | |
return None | |
# find the svfip config directory in the registry or the env vars | |
CONFIG_DIR = RegKey.value_by_name(winreg.HKEY_CURRENT_USER, r"SOFTWARE\SFVIP", "ConfigDir") | |
CONFIG_DIR = Path(CONFIG_DIR) if CONFIG_DIR else Path(os.getenv("APPDATA")) / "SFVIP-Player" | |
class Serializer: | |
"""(De)serialize a nested pure class""" | |
@classmethod | |
def as_dict(cls) -> dict: | |
return { | |
k: o.as_dict() if isinstance(o, type) and issubclass(o, Serializer) else o | |
for k, o in cls.__dict__.items() | |
if not k.startswith("_") # only public class attributes | |
} | |
@classmethod | |
def map_dict(cls, dct: dict) -> None: | |
for k, v in dct.items(): | |
if o := getattr(cls, k, None): | |
if isinstance(v, dict) and isinstance(o, type) and issubclass(o, Serializer): | |
o.map_dict(v) | |
else: | |
setattr(cls, k, v) | |
class Loader(Serializer): | |
"""load and save a nested pure class as json""" | |
def __init_subclass__(cls, path: Path) -> None: | |
cls._path = path | |
return super().__init_subclass__() | |
@classmethod | |
def _open(cls, mode: str) -> IO: | |
cls._path: Path | |
return cls._path.open(mode=mode, encoding="utf-8") | |
@classmethod | |
def save(cls): | |
with cls._open("w") as f: | |
json.dump(cls.as_dict(), f, indent=2) | |
@classmethod | |
def load(cls): | |
with cls._open("r") as f: | |
cls.map_dict(json.load(f)) | |
@classmethod | |
def update(cls) -> None: | |
try: | |
cls.load() | |
except (json.JSONDecodeError, FileNotFoundError): | |
cls.save() | |
class CONFIG(Loader, path=CONFIG_DIR / "Proxy.json"): | |
"""actual config: a nested pure class of Serializers""" | |
title = "svfip proxy" | |
player = "sfvip player.exe" | |
class Proxy(Serializer): | |
buf_size_in_MB = 16 | |
log_level = "ERROR" | |
timeout = 30 | |
class AllCat(Serializer): | |
inject = ("series", "vod") | |
name = "All" | |
id = 0 | |
# need to update before SfvipPlugin declaration, since proxy.py plugins can't be setup @ runtime | |
CONFIG.update() | |
class SfvipPlugin(HttpProxyBasePlugin): | |
"""proxy.py plugin that injects the all category""" | |
_api_query = b"player_api.php?" | |
_query_attr = {httpMethods.POST: "body", httpMethods.GET: "path"} | |
_all_category_query = f"&category_id={CONFIG.AllCat.id}".encode() | |
_all_category_json = dict(category_id=str(CONFIG.AllCat.id), category_name=CONFIG.AllCat.name, parent_id=0) | |
_is_categories_query = re.compile(f"get_({'|'.join(CONFIG.AllCat.inject)})_categories".encode()).search | |
def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]: | |
# print(self.flags.toto) | |
if request.path and SfvipPlugin._api_query in request.path: | |
query_attr = SfvipPlugin._query_attr[request.method] | |
query: bytes = getattr(request, query_attr) | |
if SfvipPlugin._all_category_query in query: | |
# turn an all category query into a whole catalog query | |
setattr(request, query_attr, query.replace(SfvipPlugin._all_category_query, b"")) | |
elif SfvipPlugin._is_categories_query(query): | |
# send a response with the all category injected | |
url = str(request._url) # pylint: disable=protected-access | |
resp = requests.get(url, params=request.body, timeout=CONFIG.Proxy.timeout) | |
if resp.status_code == 200: | |
self.client.queue( | |
okResponse( | |
headers={b"Content-Type": b"application/json"}, | |
content=json.dumps([SfvipPlugin._all_category_json] + resp.json()).encode(), | |
) | |
) | |
return request | |
class SfvipProxy(proxy.Proxy): | |
"""multiprocess proxy""" | |
def __init__(self, config: type[CONFIG]) -> None: | |
buf_size = str(config.Proxy.buf_size_in_MB * 1024**2) | |
# fmt: off | |
proxy_opts = ( | |
"--timeout", str(config.Proxy.timeout), | |
"--log-level", config.Proxy.log_level, | |
"--client-recvbuf-size", buf_size, | |
"--server-recvbuf-size", buf_size, | |
"--max-sendbuf-size", buf_size, | |
"--num-acceptors", "1", # prevent shutdown lock | |
) # fmt: on | |
# automatically find port | |
super().__init__(proxy_opts, port=0, plugins=[SfvipPlugin]) | |
class SfvipUsers: | |
"""handle the users' database to add an remove the proxy setting""" | |
_playlist_ext = ".m3u", ".m3u8" | |
_database = CONFIG_DIR / "Database.json" | |
@staticmethod | |
def _is_playlist(user: dict) -> bool: | |
path = Path(user["Address"]) | |
return path.suffix in SfvipUsers._playlist_ext or path.exists() | |
@staticmethod | |
def _open(mode: str) -> IO: | |
return SfvipUsers._database.open(mode=mode, encoding="utf-8") | |
def _set_proxy(self, proxy_url: str) -> None: | |
if SfvipUsers._database.exists(): | |
with self._open("r") as f: | |
users = json.load(f) | |
if users := [user for user in users if not self._is_playlist(user)]: | |
for user in users: | |
user["HttpProxy"] = proxy_url | |
with self._open("w") as f: | |
json.dump(users, f, indent=2, separators=(",", ":")) | |
@contextmanager | |
def set_proxy(self, port: int) -> None: | |
self._set_proxy(f"http://127.0.0.1:{port}") | |
yield | |
self._set_proxy("") | |
class UI: | |
"""barebone UI""" | |
_root = None | |
def __init__(self, title: str) -> None: | |
self.title = title | |
if UI._root is None: | |
UI._root = tk.Tk() | |
UI._root.withdraw() | |
def showinfo(self, message: str) -> None: | |
messagebox.showinfo(self.title, message=message) | |
def filename(self, name: str, pattern: str) -> None: | |
title = f"{self.title}: Find {SfvipPlayer.name}" | |
return filedialog.askopenfilename(title=title, filetypes=[(name, pattern)]) | |
def askretry(self, message: str) -> None: | |
return messagebox.askretrycancel(self.title, message=message) | |
class SfvipPlayer: | |
"""find and check sfvip player""" | |
name = "sfvip player" | |
_pattern = "*sf*vip*player*.exe" | |
_regkey = winreg.HKEY_CLASSES_ROOT, r"Local Settings\Software\Microsoft\Windows\Shell\MuiCache" | |
def __init__(self, config: type[CONFIG]) -> None: | |
self.config = config | |
player = self.config.player | |
if not self._valid(player): | |
player = self._get_from_regkey() | |
if not self._valid(player): | |
player = self._get_from_user() | |
if self._valid(player) and player != self.config.player: | |
self.config.player = player | |
self.config.save() | |
@staticmethod | |
def _get_from_regkey() -> Optional[str]: | |
if name := RegKey.name_by_value(*SfvipPlayer._regkey, SfvipPlayer.name): | |
return ".".join(name.split(".")[:-1]) | |
return None | |
def _get_from_user(self) -> Optional[str]: | |
ui = UI(self.config.title) | |
ui.showinfo(f"PLease find {SfvipPlayer.name}") | |
while True: | |
if player := ui.filename(SfvipPlayer.name, SfvipPlayer._pattern): | |
return player | |
if not ui.askretry(message=f"{SfvipPlayer.name} not found, try again ?"): | |
return None | |
@staticmethod | |
def _valid(player: Optional[str] = None) -> bool: | |
if player: | |
player: Path = Path(player) | |
return player.exists() and player.is_file() and player.match(SfvipPlayer._pattern) | |
return False | |
def valid(self) -> bool: | |
return self._valid(self.config.player) | |
def open(self) -> subprocess.Popen: | |
return subprocess.Popen([self.config.player]) | |
def shortcut_asked() -> argparse.Namespace: | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--shortcut", action="store_true", help="create a desktop shortcut") | |
return parser.parse_args().shortcut | |
def create_shortcut(config: type[CONFIG]) -> None: | |
"""create a shorcut of this script on the desktop""" | |
lnk = f'{Path(os.getenv("USERPROFILE")) / "Desktop" / f"{config.title}.lnk"}' | |
shell = win32com.client.Dispatch("WScript.Shell") | |
shortcut = shell.CreateShortcut(lnk) | |
shortcut.TargetPath = f'"{(Path(sys.executable).parent / "pythonw.exe").resolve()}"' | |
shortcut.Arguments = f'"{Path(__file__).resolve()}"' | |
shortcut.IconLocation = f"{config.player},0" | |
shortcut.Save() | |
UI(config.title).showinfo("Shortcut added on the desktop") | |
def run(config: type[CONFIG]): | |
player = SfvipPlayer(config) | |
if player.valid(): | |
if shortcut_asked(): | |
create_shortcut(config) | |
else: | |
with SfvipProxy(config) as sfvip_proxy: | |
with SfvipUsers().set_proxy(sfvip_proxy.flags.port): | |
with player.open(): | |
pass | |
if __name__ == "__main__": | |
run(CONFIG) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment