Skip to content

Instantly share code, notes, and snippets.

@sebdelsol
Last active April 30, 2023 14:02
Show Gist options
  • Save sebdelsol/089a21babbb25d00be4fc87d813ef540 to your computer and use it in GitHub Desktop.
Save sebdelsol/089a21babbb25d00be4fc87d813ef540 to your computer and use it in GitHub Desktop.
proxy to search among movies and series in sfvip player
# requirements:
# Python >= 3.10
# pip install proxy.py psgtray psutil pysimplegui pywin32 requests
# use it:
# python sfvip_proxy.py [--port PORT]
# or create the launcher:
# python sfvip_proxy.py --launcher [--port PORT]
# and Launch the proxy with sfvip_proxy.vbs
import argparse
import json
import os
import re
import subprocess
import sys
import threading
import time
from enum import Enum, auto
from pathlib import Path
from typing import IO, Any, Iterator, List, Optional
import psutil
import PySimpleGUI as sg
import requests
import win32api
import win32event
import winerror
from psgtray import SystemTray
import proxy
from proxy.http.methods import httpMethods
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from proxy.http.responses import okResponse
class Config:
class AllCat:
inject = ("series", "vod")
name = "All"
id = 0
class Proxy:
buf_size_in_MB = 16
log_level = "ERROR"
timeout = 30
port = 7777
class UI:
font = "Arial 15 bold"
theme = "DarkBlue14"
element_padding = 3
class Ok:
icon = sg.SYSTEM_TRAY_MESSAGE_ICON_INFORMATION
color = "lime green"
class Warning:
icon = sg.SYSTEM_TRAY_MESSAGE_ICON_WARNING
color = "red"
class Window(sg.Window):
_kw = dict(grab_anywhere=True, no_titlebar=True, debugger_enabled=False, keep_on_top=True)
_frame_kw = dict(p=0, border_width=1, relief=sg.RELIEF_SOLID, expand_x=True, expand_y=True)
def __init__(self, title, layout, **kw) -> None:
args = title, [[sg.Frame("", layout, **self._frame_kw)]]
kw.update(self._kw)
super().__init__(*args, margins=(0, 0), element_padding=Config.UI.element_padding, **kw)
super().finalize()
for elt in self.element_list():
elt.grab_anywhere_include()
if hasattr(elt, "finalize"):
elt.finalize()
class Button(sg.Button):
_binds = "<Enter>", "<Leave>"
_kw = dict(border_width=0)
def __init__(self, *args, mouseover_color: str, **kw) -> None:
self.colors = dict(Enter=mouseover_color, Leave=sg.theme_button_color())
super().__init__(*args, **(kw | self._kw))
def finalize(self) -> None:
for bind in self._binds:
self.Widget.bind(bind, self._on_mouse_over)
self.block_focus(True)
def _on_mouse_over(self, event: sg.tk.Event) -> None:
self.update(button_color=self.colors.get(event.type.name))
class YesNoWindow(Window):
_yes, _no = "Yes", "No"
def __init__(self, title) -> None:
no_button = Button(self._no, mouseover_color=Config.UI.Ok.color)
yes_button = Button(self._yes, mouseover_color=Config.UI.Warning.color)
title = sg.T(title, expand_x=True, justification="center")
layout = [[title], [no_button, sg.P(), yes_button]]
super().__init__("", layout, modal=True)
def loop(self) -> None:
while True:
event, _ = self.read()
if event in (self._yes, self._no, sg.WIN_CLOSED):
self.close()
return event == self._yes
class SingleApp:
# pylint: disable=c-extension-no-member
def __init__(self, name: str) -> None:
self._event = win32event.CreateEvent(None, False, False, f"{name}_START_EVENT")
self.already_running = win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS
win32event.SetEvent(self._event)
def wait_for_another_to_start(self):
win32event.WaitForSingleObject(self._event, win32event.INFINITE)
def __enter__(self):
return self
def __exit__(self, *_) -> None:
win32api.CloseHandle(self._event)
class SfvipPlugin(HttpProxyBasePlugin):
_api_query = b"player_api.php?"
_query_attr = {httpMethods.POST: "body", httpMethods.GET: "path"}
_all_category_query = f"&category_id={Config.AllCat.id}".encode()
_all_category_json = dict(category_id=str(Config.AllCat.id), category_name=Config.AllCat.name, parent_id=0)
_is_categories_query = re.compile(f"get_({'|'.join(Config.AllCat.inject)})_categories".encode()).search
def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]:
if request.path and self._api_query in request.path:
query_attr = self._query_attr[request.method]
query: bytes = getattr(request, query_attr)
if self._all_category_query in query:
# turn an all category query into a whole catalog query
setattr(request, query_attr, query.replace(self._all_category_query, b""))
elif self._is_categories_query(query):
# send a response with the all category injected
url = str(request._url) # pylint: disable=protected-access
resp = requests.get(url, params=request.body, timeout=Config.Proxy.timeout)
if resp.status_code == 200:
self.client.queue(
okResponse(
headers={b"Content-Type": b"application/json"},
content=json.dumps([self._all_category_json] + resp.json()).encode(),
)
)
return request
class SfvipProxy(proxy.Proxy):
buf_size = str(Config.Proxy.buf_size_in_MB * 1024**2)
proxy_opts = (
*("--timeout", str(Config.Proxy.timeout)),
*("--log-level", Config.Proxy.log_level),
*("--client-recvbuf-size", buf_size),
*("--server-recvbuf-size", buf_size),
*("--max-sendbuf-size", buf_size),
*("--num-acceptors", "1"), # prevent shutdown lock
)
def __init__(self, port: int) -> None:
super().__init__(self.proxy_opts, port=port, plugins=[SfvipPlugin])
class SfvipUsers:
_database = Path(os.getenv("APPDATA")) / "SFVIP-Player" / "Database.json"
_playlist_ext = ".m3u", ".m3u8"
def _open(self, mode: str) -> IO:
return self._database.open(mode=mode, encoding="utf8")
def _is_playlist(self, user: dict) -> bool:
path = Path(user["Address"])
return path.suffix in self._playlist_ext or path.exists()
def set_proxy(self, proxy_url: str) -> None:
if self._database.exists():
with self._open("r") as f:
users = json.load(f)
if users := [user for user in users if not self._is_playlist(user)]:
for user in users:
user["HttpProxy"] = proxy_url
with self._open("w") as f:
json.dump(users, f, indent=2, separators=(",", ":"))
class SfvipProcess:
_name = "sfvip player.exe"
_creationflags = subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.CREATE_BREAKAWAY_FROM_JOB
def get_running(self, *attrs: List[str]) -> Iterator[psutil.Process]:
for process in psutil.process_iter(("name", *attrs)):
if self._name == process.info["name"]:
yield process
def is_running(self) -> bool:
return any(True for _ in self.get_running())
def kill(self) -> List[List[str]]:
processes = list(self.get_running("cmdline"))
for process in processes:
process.kill()
psutil.wait_procs(processes)
return [process.info["cmdline"] for process in processes]
def start(self, cmdlines: List[List[str]]) -> None:
for cmdline in cmdlines:
subprocess.Popen(cmdline, creationflags=self._creationflags) # pylint: disable=consider-using-with
class SfvipApp(Window):
name = "SF VIP Proxy"
class Event(Enum):
ANOTHER_APP_HAS_STARTED = auto()
WAIT_FOR_PLAYER = auto()
PROXY_STARTED = auto()
MINIMIZE = auto()
CLOSE = auto()
_sleep = 1 # second
def __init__(self, app: SingleApp, port: int) -> None:
sg.theme(Config.UI.theme)
sg.set_options(font=Config.UI.font)
self.sfvip_process = SfvipProcess()
self.sfvip_users = SfvipUsers()
self.running = False
self.port = port
self.app = app
self.tray = SystemTray(["", []], window=self, single_click_events=True, icon=Config.UI.Warning.icon)
b_minimize = Button("___", mouseover_color=Config.UI.Ok.color, k=self.Event.MINIMIZE)
b_quit = Button("X", mouseover_color=Config.UI.Warning.color, k=self.Event.CLOSE)
self.proxy_txt = sg.T("Init...", justification="center", expand_x=True)
self.img = sg.Image()
super().__init__(self.name, [[self.img, sg.T(self.name), self.proxy_txt, b_quit, b_minimize]])
def _send_event(self, event: Any, value: Any = None) -> None:
if self.running:
self.write_event_value(event, value)
def _ask_quit(self) -> bool:
self.hide()
if YesNoWindow(title=f"Quit {self.name} ?").loop():
return True
self.un_hide()
return False
def _toggle_visibility(self) -> None:
if self._Hidden:
self.un_hide()
else:
self.hide()
def _set_state(self, txt: str, state: Config.UI.Ok | Config.UI.Warning) -> None:
self.proxy_txt(txt, text_color=state.color)
self.tray.change_icon(state.icon)
self.tray.set_tooltip(txt)
self.img(state.icon)
def _set_proxy(self, proxy_url: str) -> None:
if self.sfvip_process.is_running():
killed = self.sfvip_process.kill()
self.sfvip_users.set_proxy(proxy_url)
self.sfvip_process.start(killed)
else:
self.sfvip_users.set_proxy(proxy_url)
def _wait_and_run_proxy(self) -> None:
proxy_url = f"http://127.0.0.1:{self.port}"
self._set_proxy(proxy_url)
while self.running:
if not self.sfvip_process.is_running():
self._send_event(self.Event.WAIT_FOR_PLAYER)
while self.running and not self.sfvip_process.is_running():
time.sleep(self._sleep)
if self.running:
self._send_event(self.Event.PROXY_STARTED, proxy_url)
with SfvipProxy(self.port):
while self.running and self.sfvip_process.is_running():
time.sleep(self._sleep)
self._set_proxy("")
def _wait_for_another_app_to_start(self):
while self.running:
self.app.wait_for_another_to_start()
self._send_event(self.Event.ANOTHER_APP_HAS_STARTED)
def loop(self) -> None:
self.running = True
threading.Thread(target=self._wait_and_run_proxy).start()
threading.Thread(target=self._wait_for_another_app_to_start, daemon=True).start()
while self.running:
event, values = self.read()
match event:
case self.Event.WAIT_FOR_PLAYER:
self._set_state("wait for player", Config.UI.Warning)
case self.Event.PROXY_STARTED:
self._set_state(f"started @ {values[event]}", Config.UI.Ok)
case self.Event.ANOTHER_APP_HAS_STARTED:
self.un_hide()
case self.tray.key:
self._toggle_visibility()
case self.Event.MINIMIZE:
self.hide()
case self.Event.CLOSE:
if self._ask_quit():
break
case sg.WIN_CLOSED:
break
self.running = False
self.tray.close()
self.close()
def get_options() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=Config.Proxy.port, help="Proxy port")
parser.add_argument("--launcher", action="store_true", help="Create the launcher")
return parser.parse_args()
def create_vbs_launcher(port: int) -> None:
python_path = Path(sys.executable).parent / "pythonw.exe"
script_path = Path(__file__)
code = (
'Set WshShell = CreateObject("WScript.Shell")',
f'WshShell.Run """{python_path.resolve()}"" ""{script_path.resolve()}"" --port {port}"',
)
script_path.with_suffix(".vbs").write_text("\n".join(code), encoding="utf-8")
if __name__ == "__main__":
options = get_options()
if options.launcher:
create_vbs_launcher(options.port)
else:
with SingleApp(SfvipApp.name) as single_app:
if not single_app.already_running:
SfvipApp(single_app, options.port).loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment