Created
September 20, 2024 22:22
-
-
Save bearlike/01c2a37ad7a8b19d578e08499a6a41a8 to your computer and use it in GitHub Desktop.
Explore Torrents managed by my qBittorrent Instance. Tool for OpenWebUI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
title: Explore Torrents managed by my qBittorrent Instance. Tool for OpenWebUI | |
author: KK | |
funding_url: https://github.com/bearlike | |
version: 0.1.3 | |
license: MIT | |
""" | |
from qbittorrent import Client | |
from pydantic import BaseModel, Field | |
import json | |
from datetime import timedelta, datetime | |
from typing import Callable, Any | |
import asyncio | |
class EventEmitter: | |
def __init__(self, event_emitter: Callable[[dict], Any] = None): | |
self.event_emitter = event_emitter | |
async def emit(self, description="Unknown State", status="in_progress", done=False): | |
if self.event_emitter: | |
await self.event_emitter( | |
{ | |
"type": "status", | |
"data": { | |
"status": status, | |
"description": description, | |
"done": done, | |
}, | |
} | |
) | |
class Tools: | |
class Valves(BaseModel): | |
QB_SERVER_URL: str = Field( | |
default="http://gluetun:8432", | |
description="The base URL for the qBittorrent server.", | |
) | |
QB_USERNAME: str = Field( | |
default="admin", description="The username for the qBittorrent server." | |
) | |
QB_PASSWORD: str = Field( | |
default="admin", description="The password for the qBittorrent server." | |
) | |
def __init__(self): | |
self.valves = self.Valves() | |
async def get_torrents_info( | |
self, info_hash: str = None, __event_emitter__: Callable[[dict], Any] = None | |
) -> str: | |
""" | |
Retrieve the status, metadata and other known information for torrents downloads managed by the qBittorrent Instance. | |
:param info_hash: Optional info hash of a specific torrent. If not provided, retrieves all torrents. | |
:return: A list of torrent statuses. | |
""" | |
def unix_to_human_ts(timestamp): | |
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%b-%d %H:%M:%S") | |
def unix_to_human_delta(timestamp): | |
if timestamp <= 0 or timestamp >= 8640000: | |
return "N/A" | |
return str(timedelta(seconds=timestamp)).replace(", 0:00:00", "") | |
def bytes_to_human_size(size: int) -> str: | |
suffixes = ["B", "KB", "MB", "GB", "TB"] | |
index = 0 | |
while size >= 1024 and index < len(suffixes) - 1: | |
size /= 1024 | |
index += 1 | |
return f"{size:.2f} {suffixes[index]}" | |
emitter = EventEmitter(__event_emitter__) | |
try: | |
await emitter.emit("Authenticating with qBittorrent") | |
# Run the login operation in a separate thread to avoid blocking | |
qb = await asyncio.to_thread(Client, self.valves.QB_SERVER_URL) | |
await asyncio.to_thread( | |
qb.login, self.valves.QB_USERNAME, self.valves.QB_PASSWORD | |
) | |
await emitter.emit("Querying qBittorrent for torrents") | |
# Run the torrents fetching operation asynchronously | |
if info_hash: | |
torrents = await asyncio.to_thread( | |
lambda: [ | |
torrent | |
for torrent in qb.torrents() | |
if torrent["hash"] == info_hash | |
] | |
) | |
else: | |
torrents = await asyncio.to_thread(qb.torrents) | |
state_mapper = { | |
"allocating": "ALLOCATING", | |
"checkingDL": "CHECKING_DOWNLOAD", | |
"checkingResumeData": "CHECKING_RESUME_DATA", | |
"checkingUP": "CHECKING_UPLOAD", | |
"downloading": "DOWNLOADING", | |
"error": "ERROR", | |
"forcedDL": "FORCED_DOWNLOAD", | |
"forcedUP": "FORCED_UPLOAD", | |
"metaDL": "METADATA_DOWNLOAD", | |
"missingFiles": "MISSING_FILES", | |
"moving": "MOVING", | |
"pausedDL": "PAUSED_DOWNLOAD", | |
"pausedUP": "PAUSED_UPLOAD", | |
"queuedDL": "QUEUED_DOWNLOAD", | |
"queuedUP": "QUEUED_UPLOAD", | |
"stalledDL": "STALLED_DOWNLOAD", | |
"stalledUP": "STALLED_UPLOAD", | |
"unknown": "UNKNOWN", | |
"uploading": "UPLOADING", | |
} | |
await emitter.emit("Processing qBittorrent response") | |
results_json = [] | |
for torrent in torrents: | |
result = { | |
"name": torrent["name"], | |
"state": state_mapper.get(torrent["state"], "UNKNOWN"), | |
"progress": torrent["progress"], | |
"download_speed": torrent["dlspeed"], | |
"upload_speed": torrent["upspeed"], | |
"size": bytes_to_human_size(torrent["total_size"]), | |
"completed": bytes_to_human_size(torrent["completed"]), | |
"category": torrent["category"], | |
"tags": torrent["tags"], | |
"eta": unix_to_human_delta(torrent["eta"]), | |
"completed_on": unix_to_human_ts(torrent["completion_on"]), | |
"amount_left": torrent["amount_left"], | |
} | |
if ( | |
result["amount_left"] == 0 | |
and result["eta"] == "N/A" | |
and result["state"] == "PAUSED_UPLOAD" | |
): | |
result["state"] = "FINISHED" | |
results_json.append(result) | |
await emitter.emit( | |
status="complete", | |
description=f"Retrieved {len(results_json)} torrents.", | |
done=True, | |
) | |
except Exception as err: | |
await emitter.emit( | |
status="error", | |
description=f"An error occurred while fetching torrents: {str(err)}", | |
done=True, | |
) | |
results_json = [{"error": str(err)}] | |
return json.dumps(results_json, ensure_ascii=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment