Skip to content

Instantly share code, notes, and snippets.

@sebdelsol
Last active May 3, 2023 13:30
Show Gist options
  • Save sebdelsol/59a0e90587faba3aa3614ef6832009a9 to your computer and use it in GitHub Desktop.
Save sebdelsol/59a0e90587faba3aa3614ef6832009a9 to your computer and use it in GitHub Desktop.
Add an all category in vod and series
# requirements:
# Python >= 3.10
# pip install proxy.py pywin32 requests
# create a shortcut :
# python sfvip_proxy.py --shorcut
# use it !
import argparse
import itertools
import json
import os
import re
import subprocess
import sys
import tkinter as tk
import winreg
from contextlib import contextmanager, suppress
from pathlib import Path
from tkinter import filedialog, messagebox
from typing import IO, Any, Optional
import requests
import win32com.client
import proxy
from proxy.http.methods import httpMethods
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from proxy.http.responses import okResponse
class RegKey:
"""find stuff in the registry"""
@staticmethod
def name_by_value(hkey: winreg.HKEYType, path: str, searched_value: Any) -> Optional[str]:
with suppress(WindowsError), winreg.OpenKey(hkey, path) as key:
for i in itertools.count():
name, value, _ = winreg.EnumValue(key, i)
if value == searched_value:
return name
return None
@staticmethod
def value_by_name(hkey: winreg.HKEYType, path: str, name: str) -> Optional[Any]:
with suppress(WindowsError, FileNotFoundError), winreg.OpenKey(hkey, path) as k:
value, _ = winreg.QueryValueEx(k, name)
return value
return None
# find the svfip config directory in the registry or the env vars
CONFIG_DIR = RegKey.value_by_name(winreg.HKEY_CURRENT_USER, r"SOFTWARE\SFVIP", "ConfigDir")
CONFIG_DIR = Path(CONFIG_DIR) if CONFIG_DIR else Path(os.getenv("APPDATA")) / "SFVIP-Player"
class Serializer:
"""(De)serialize a nested pure class"""
@classmethod
def as_dict(cls) -> dict:
return {
k: o.as_dict() if isinstance(o, type) and issubclass(o, Serializer) else o
for k, o in cls.__dict__.items()
if not k.startswith("_") # only public class attributes
}
@classmethod
def map_dict(cls, dct: dict) -> None:
for k, v in dct.items():
if o := getattr(cls, k, None):
if isinstance(v, dict) and isinstance(o, type) and issubclass(o, Serializer):
o.map_dict(v)
else:
setattr(cls, k, v)
class Loader(Serializer):
"""load and save a nested pure class as json"""
def __init_subclass__(cls, path: Path) -> None:
cls._path = path
return super().__init_subclass__()
@classmethod
def _open(cls, mode: str) -> IO:
cls._path: Path
return cls._path.open(mode=mode, encoding="utf-8")
@classmethod
def save(cls):
with cls._open("w") as f:
json.dump(cls.as_dict(), f, indent=2)
@classmethod
def load(cls):
with cls._open("r") as f:
cls.map_dict(json.load(f))
@classmethod
def update(cls) -> None:
try:
cls.load()
except (json.JSONDecodeError, FileNotFoundError):
cls.save()
class CONFIG(Loader, path=CONFIG_DIR / "Proxy.json"):
"""actual config: a nested pure class of Serializers"""
title = "svfip proxy"
player = "sfvip player.exe"
class Proxy(Serializer):
buf_size_in_MB = 16
log_level = "ERROR"
timeout = 30
class AllCat(Serializer):
inject = ("series", "vod")
name = "All"
id = 0
# need to update before SfvipPlugin declaration, since proxy.py plugins can't be setup @ runtime
CONFIG.update()
class SfvipPlugin(HttpProxyBasePlugin):
"""proxy.py plugin that injects the all category"""
_api_query = b"player_api.php?"
_query_attr = {httpMethods.POST: "body", httpMethods.GET: "path"}
_all_category_query = f"&category_id={CONFIG.AllCat.id}".encode()
_all_category_json = dict(category_id=str(CONFIG.AllCat.id), category_name=CONFIG.AllCat.name, parent_id=0)
_is_categories_query = re.compile(f"get_({'|'.join(CONFIG.AllCat.inject)})_categories".encode()).search
def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]:
# print(self.flags.toto)
if request.path and SfvipPlugin._api_query in request.path:
query_attr = SfvipPlugin._query_attr[request.method]
query: bytes = getattr(request, query_attr)
if SfvipPlugin._all_category_query in query:
# turn an all category query into a whole catalog query
setattr(request, query_attr, query.replace(SfvipPlugin._all_category_query, b""))
elif SfvipPlugin._is_categories_query(query):
# send a response with the all category injected
url = str(request._url) # pylint: disable=protected-access
resp = requests.get(url, params=request.body, timeout=CONFIG.Proxy.timeout)
if resp.status_code == 200:
self.client.queue(
okResponse(
headers={b"Content-Type": b"application/json"},
content=json.dumps([SfvipPlugin._all_category_json] + resp.json()).encode(),
)
)
return request
class SfvipProxy(proxy.Proxy):
"""multiprocess proxy"""
def __init__(self, config: type[CONFIG]) -> None:
buf_size = str(config.Proxy.buf_size_in_MB * 1024**2)
# fmt: off
proxy_opts = (
"--timeout", str(config.Proxy.timeout),
"--log-level", config.Proxy.log_level,
"--client-recvbuf-size", buf_size,
"--server-recvbuf-size", buf_size,
"--max-sendbuf-size", buf_size,
"--num-acceptors", "1", # prevent shutdown lock
) # fmt: on
# automatically find port
super().__init__(proxy_opts, port=0, plugins=[SfvipPlugin])
class SfvipUsers:
"""handle the users' database to add an remove the proxy setting"""
_playlist_ext = ".m3u", ".m3u8"
_database = CONFIG_DIR / "Database.json"
@staticmethod
def _is_playlist(user: dict) -> bool:
path = Path(user["Address"])
return path.suffix in SfvipUsers._playlist_ext or path.exists()
@staticmethod
def _open(mode: str) -> IO:
return SfvipUsers._database.open(mode=mode, encoding="utf-8")
def _set_proxy(self, proxy_url: str) -> None:
if SfvipUsers._database.exists():
with self._open("r") as f:
users = json.load(f)
if users := [user for user in users if not self._is_playlist(user)]:
for user in users:
user["HttpProxy"] = proxy_url
with self._open("w") as f:
json.dump(users, f, indent=2, separators=(",", ":"))
@contextmanager
def set_proxy(self, port: int) -> None:
self._set_proxy(f"http://127.0.0.1:{port}")
yield
self._set_proxy("")
class UI:
"""barebone UI"""
_root = None
def __init__(self, title: str) -> None:
self.title = title
if UI._root is None:
UI._root = tk.Tk()
UI._root.withdraw()
def showinfo(self, message: str) -> None:
messagebox.showinfo(self.title, message=message)
def filename(self, name: str, pattern: str) -> None:
title = f"{self.title}: Find {SfvipPlayer.name}"
return filedialog.askopenfilename(title=title, filetypes=[(name, pattern)])
def askretry(self, message: str) -> None:
return messagebox.askretrycancel(self.title, message=message)
class SfvipPlayer:
"""find and check sfvip player"""
name = "sfvip player"
_pattern = "*sf*vip*player*.exe"
_regkey = winreg.HKEY_CLASSES_ROOT, r"Local Settings\Software\Microsoft\Windows\Shell\MuiCache"
def __init__(self, config: type[CONFIG]) -> None:
self.config = config
player = self.config.player
if not self._valid(player):
player = self._get_from_regkey()
if not self._valid(player):
player = self._get_from_user()
if self._valid(player) and player != self.config.player:
self.config.player = player
self.config.save()
@staticmethod
def _get_from_regkey() -> Optional[str]:
if name := RegKey.name_by_value(*SfvipPlayer._regkey, SfvipPlayer.name):
return ".".join(name.split(".")[:-1])
return None
def _get_from_user(self) -> Optional[str]:
ui = UI(self.config.title)
ui.showinfo(f"PLease find {SfvipPlayer.name}")
while True:
if player := ui.filename(SfvipPlayer.name, SfvipPlayer._pattern):
return player
if not ui.askretry(message=f"{SfvipPlayer.name} not found, try again ?"):
return None
@staticmethod
def _valid(player: Optional[str] = None) -> bool:
if player:
player: Path = Path(player)
return player.exists() and player.is_file() and player.match(SfvipPlayer._pattern)
return False
def valid(self) -> bool:
return self._valid(self.config.player)
def open(self) -> subprocess.Popen:
return subprocess.Popen([self.config.player])
def shortcut_asked() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--shortcut", action="store_true", help="create a desktop shortcut")
return parser.parse_args().shortcut
def create_shortcut(config: type[CONFIG]) -> None:
"""create a shorcut of this script on the desktop"""
lnk = f'{Path(os.getenv("USERPROFILE")) / "Desktop" / f"{config.title}.lnk"}'
shell = win32com.client.Dispatch("WScript.Shell")
shortcut = shell.CreateShortcut(lnk)
shortcut.TargetPath = f'"{(Path(sys.executable).parent / "pythonw.exe").resolve()}"'
shortcut.Arguments = f'"{Path(__file__).resolve()}"'
shortcut.IconLocation = f"{config.player},0"
shortcut.Save()
UI(config.title).showinfo("Shortcut added on the desktop")
def run(config: type[CONFIG]):
player = SfvipPlayer(config)
if player.valid():
if shortcut_asked():
create_shortcut(config)
else:
with SfvipProxy(config) as sfvip_proxy:
with SfvipUsers().set_proxy(sfvip_proxy.flags.port):
with player.open():
pass
if __name__ == "__main__":
run(CONFIG)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment