Skip to content

Instantly share code, notes, and snippets.

@bearlike
Created September 20, 2024 22:22
Show Gist options
  • Save bearlike/01c2a37ad7a8b19d578e08499a6a41a8 to your computer and use it in GitHub Desktop.
Save bearlike/01c2a37ad7a8b19d578e08499a6a41a8 to your computer and use it in GitHub Desktop.
Explore Torrents managed by my qBittorrent Instance. Tool for OpenWebUI
"""
title: Explore Torrents managed by my qBittorrent Instance. Tool for OpenWebUI
author: KK
funding_url: https://github.com/bearlike
version: 0.1.3
license: MIT
"""
from qbittorrent import Client
from pydantic import BaseModel, Field
import json
from datetime import timedelta, datetime
from typing import Callable, Any
import asyncio
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
class Valves(BaseModel):
QB_SERVER_URL: str = Field(
default="http://gluetun:8432",
description="The base URL for the qBittorrent server.",
)
QB_USERNAME: str = Field(
default="admin", description="The username for the qBittorrent server."
)
QB_PASSWORD: str = Field(
default="admin", description="The password for the qBittorrent server."
)
def __init__(self):
self.valves = self.Valves()
async def get_torrents_info(
self, info_hash: str = None, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""
Retrieve the status, metadata and other known information for torrents downloads managed by the qBittorrent Instance.
:param info_hash: Optional info hash of a specific torrent. If not provided, retrieves all torrents.
:return: A list of torrent statuses.
"""
def unix_to_human_ts(timestamp):
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%b-%d %H:%M:%S")
def unix_to_human_delta(timestamp):
if timestamp <= 0 or timestamp >= 8640000:
return "N/A"
return str(timedelta(seconds=timestamp)).replace(", 0:00:00", "")
def bytes_to_human_size(size: int) -> str:
suffixes = ["B", "KB", "MB", "GB", "TB"]
index = 0
while size >= 1024 and index < len(suffixes) - 1:
size /= 1024
index += 1
return f"{size:.2f} {suffixes[index]}"
emitter = EventEmitter(__event_emitter__)
try:
await emitter.emit("Authenticating with qBittorrent")
# Run the login operation in a separate thread to avoid blocking
qb = await asyncio.to_thread(Client, self.valves.QB_SERVER_URL)
await asyncio.to_thread(
qb.login, self.valves.QB_USERNAME, self.valves.QB_PASSWORD
)
await emitter.emit("Querying qBittorrent for torrents")
# Run the torrents fetching operation asynchronously
if info_hash:
torrents = await asyncio.to_thread(
lambda: [
torrent
for torrent in qb.torrents()
if torrent["hash"] == info_hash
]
)
else:
torrents = await asyncio.to_thread(qb.torrents)
state_mapper = {
"allocating": "ALLOCATING",
"checkingDL": "CHECKING_DOWNLOAD",
"checkingResumeData": "CHECKING_RESUME_DATA",
"checkingUP": "CHECKING_UPLOAD",
"downloading": "DOWNLOADING",
"error": "ERROR",
"forcedDL": "FORCED_DOWNLOAD",
"forcedUP": "FORCED_UPLOAD",
"metaDL": "METADATA_DOWNLOAD",
"missingFiles": "MISSING_FILES",
"moving": "MOVING",
"pausedDL": "PAUSED_DOWNLOAD",
"pausedUP": "PAUSED_UPLOAD",
"queuedDL": "QUEUED_DOWNLOAD",
"queuedUP": "QUEUED_UPLOAD",
"stalledDL": "STALLED_DOWNLOAD",
"stalledUP": "STALLED_UPLOAD",
"unknown": "UNKNOWN",
"uploading": "UPLOADING",
}
await emitter.emit("Processing qBittorrent response")
results_json = []
for torrent in torrents:
result = {
"name": torrent["name"],
"state": state_mapper.get(torrent["state"], "UNKNOWN"),
"progress": torrent["progress"],
"download_speed": torrent["dlspeed"],
"upload_speed": torrent["upspeed"],
"size": bytes_to_human_size(torrent["total_size"]),
"completed": bytes_to_human_size(torrent["completed"]),
"category": torrent["category"],
"tags": torrent["tags"],
"eta": unix_to_human_delta(torrent["eta"]),
"completed_on": unix_to_human_ts(torrent["completion_on"]),
"amount_left": torrent["amount_left"],
}
if (
result["amount_left"] == 0
and result["eta"] == "N/A"
and result["state"] == "PAUSED_UPLOAD"
):
result["state"] = "FINISHED"
results_json.append(result)
await emitter.emit(
status="complete",
description=f"Retrieved {len(results_json)} torrents.",
done=True,
)
except Exception as err:
await emitter.emit(
status="error",
description=f"An error occurred while fetching torrents: {str(err)}",
done=True,
)
results_json = [{"error": str(err)}]
return json.dumps(results_json, ensure_ascii=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment