Files
2026-03-10 17:29:59 -04:00

174 lines
6.9 KiB
Python

import os
import hashlib
import bencodepy
from qbittorrentapi import Client
import tomllib
from pathlib import Path
config = tomllib.loads(Path("config.toml").read_text())
def get_qb_client() -> Client:
"""
Initializes and returns an authenticated qBittorrent client.
Based on existing implementations in TrackerEditor.
"""
qb = Client(host=config["qb"]["host"], username=config["qb"]["username"], password=config["qb"]["password"])
qb.auth_log_in()
return qb
def download_torrent(qb_client: Client, torrent_source: str | bytes, save_path: str) -> str:
"""
4. Calls qb api to download a torrent to a messy directory.
:param qb_client: Authenticated qbittorrentapi.Client
:param torrent_source: File path to a .torrent file, a magnet link / URL, or raw bytes.
:param save_path: The directory where the torrent should be downloaded (e.g. the messy folder).
:return: Response from the API.
"""
if isinstance(torrent_source, bytes):
return qb_client.torrents_add(torrent_files={"upload.torrent": torrent_source}, save_path=save_path)
elif os.path.isfile(torrent_source):
# Open and read the bytes explicitly so that qb uploads the file data,
# negating local path security issues on the remote instance
with open(torrent_source, "rb") as f:
torrent_filename = os.path.basename(torrent_source)
# Pass dictionary mappings to `torrent_files` to upload binary streams directly
return qb_client.torrents_add(torrent_files={torrent_filename: f.read()}, save_path=save_path)
else:
# It's a magnet link or URL
return qb_client.torrents_add(urls=torrent_source, save_path=save_path)
def get_torrent_file_tree(qb_client: Client, torrent_hash: str) -> list:
"""
5. Calls qb api to view the file tree inside the torrent.
:param qb_client: Authenticated qbittorrentapi.Client
:param torrent_hash: The hash of the target torrent.
:return: A list of dicts representing the files inside the torrent,
which includes their relative paths reflecting the file tree.
"""
try:
files = qb_client.torrents_files(torrent_hash=torrent_hash)
# The API returns a list of dictionaries containing file info (name with path separators, size, etc.)
file_tree = []
for f in files:
file_tree.append({
"id": getattr(f, "id", None),
"name": getattr(f, "name", ""),
"size": getattr(f, "size", 0),
"progress": getattr(f, "progress", 0)
})
return file_tree
except Exception as e:
print(f"Error fetching file tree for {torrent_hash}: {e}")
return []
def get_torrent_hash(source: str | bytes) -> str:
"""
Parses a local .torrent file or raw bytes and computes its info hash directly.
"""
try:
if isinstance(source, bytes):
torrent_data = bencodepy.decode(source)
else:
with open(source, "rb") as f:
torrent_data = bencodepy.decode(f.read())
# Info dictionary is under b"info"
info_data = torrent_data[b"info"]
info_encoded = bencodepy.encode(info_data)
# Calculate SHA1 hash of the bencoded info dictionary
return hashlib.sha1(info_encoded).hexdigest()
except Exception as e:
print(f"Could not parse torrent hash: {e}")
return ""
def rename_torrent_and_folder(qb_client: Client, torrent_hash: str, new_name: str) -> None:
"""
Renames the torrent display name and the top-level folder on disk to the given `new_name`.
"""
info = qb_client.torrents_info(hashes=torrent_hash)
if not info:
print(f"Torrent {torrent_hash} not found to rename.")
return
t_info = info[0]
old_name = t_info.name
print(f"Renaming torrent name from '{old_name}' to '{new_name}'")
try:
qb_client.torrents_rename(torrent_hash=torrent_hash, new_torrent_name=new_name)
except Exception as e:
print(f"Failed to rename torrent: {e}")
try:
files = qb_client.torrents_files(torrent_hash=torrent_hash)
if files:
root_dirs = set()
for f in files:
parts = getattr(f, "name", "").replace("\\", "/").split("/")
if len(parts) > 1:
root_dirs.add(parts[0])
else:
root_dirs.add("") # Single file at base
if len(root_dirs) == 1 and "" not in root_dirs:
old_folder = list(root_dirs)[0]
if old_folder != new_name:
print(f"Renaming top-level dir from '{old_folder}' to '{new_name}'")
qb_client.torrents_rename_folder(torrent_hash=torrent_hash, old_path=old_folder, new_path=new_name)
else:
print("Multiple root folders or single files at base detected. Skipping folder rename.")
except Exception as e:
print(f"Failed to rename folder: {e}")
import time
print("Waiting for rename to take effect...")
for _ in range(15): # wait up to 15 seconds
info = qb_client.torrents_info(hashes=torrent_hash)
if not info:
break
current_name = info[0].name
# Check files to see if root path matches
files = qb_client.torrents_files(torrent_hash=torrent_hash)
if not files:
time.sleep(1)
continue
all_match_or_single = True
has_root_dir = all("/" in getattr(f, "name", "") or "\\" in getattr(f, "name", "") for f in files)
if has_root_dir:
if not all(getattr(f, "name", "").startswith(f"{new_name}/") or getattr(f, "name", "").startswith(f"{new_name}\\") for f in files):
all_match_or_single = False
if current_name == new_name and all_match_or_single:
print("Rename confirmed by qBittorrent.")
return
time.sleep(1)
print("Warning: Rename may not have fully propagated yet.")
def remove_tag_if_exists(qb_client: Client, torrent_hash: str, tag_to_remove: str) -> None:
"""
Checks if a tag exists on a torrent, and removes it if it does.
"""
info = qb_client.torrents_info(hashes=torrent_hash)
if not info:
return
t_info = info[0]
tags = getattr(t_info, "tags", "")
if tags:
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
if tag_to_remove in tag_list:
print(f"Removing existing tag '{tag_to_remove}' from torrent.")
qb_client.torrents_remove_tags(tags=tag_to_remove, torrent_hashes=torrent_hash)