174 lines
6.9 KiB
Python
174 lines
6.9 KiB
Python
import os
|
|
import hashlib
|
|
import bencodepy
|
|
from qbittorrentapi import Client
|
|
import tomllib
|
|
from pathlib import Path
|
|
|
|
config = tomllib.loads(Path("config.toml").read_text())
|
|
|
|
def get_qb_client() -> Client:
|
|
"""
|
|
Initializes and returns an authenticated qBittorrent client.
|
|
Based on existing implementations in TrackerEditor.
|
|
"""
|
|
qb = Client(host=config["qb"]["host"], username=config["qb"]["username"], password=config["qb"]["password"])
|
|
qb.auth_log_in()
|
|
return qb
|
|
|
|
|
|
def download_torrent(qb_client: Client, torrent_source: str | bytes, save_path: str) -> str:
|
|
"""
|
|
4. Calls qb api to download a torrent to a messy directory.
|
|
|
|
:param qb_client: Authenticated qbittorrentapi.Client
|
|
:param torrent_source: File path to a .torrent file, a magnet link / URL, or raw bytes.
|
|
:param save_path: The directory where the torrent should be downloaded (e.g. the messy folder).
|
|
:return: Response from the API.
|
|
"""
|
|
if isinstance(torrent_source, bytes):
|
|
return qb_client.torrents_add(torrent_files={"upload.torrent": torrent_source}, save_path=save_path)
|
|
elif os.path.isfile(torrent_source):
|
|
# Open and read the bytes explicitly so that qb uploads the file data,
|
|
# negating local path security issues on the remote instance
|
|
with open(torrent_source, "rb") as f:
|
|
torrent_filename = os.path.basename(torrent_source)
|
|
# Pass dictionary mappings to `torrent_files` to upload binary streams directly
|
|
return qb_client.torrents_add(torrent_files={torrent_filename: f.read()}, save_path=save_path)
|
|
else:
|
|
# It's a magnet link or URL
|
|
return qb_client.torrents_add(urls=torrent_source, save_path=save_path)
|
|
|
|
|
|
def get_torrent_file_tree(qb_client: Client, torrent_hash: str) -> list:
|
|
"""
|
|
5. Calls qb api to view the file tree inside the torrent.
|
|
|
|
:param qb_client: Authenticated qbittorrentapi.Client
|
|
:param torrent_hash: The hash of the target torrent.
|
|
:return: A list of dicts representing the files inside the torrent,
|
|
which includes their relative paths reflecting the file tree.
|
|
"""
|
|
try:
|
|
files = qb_client.torrents_files(torrent_hash=torrent_hash)
|
|
|
|
# The API returns a list of dictionaries containing file info (name with path separators, size, etc.)
|
|
file_tree = []
|
|
for f in files:
|
|
file_tree.append({
|
|
"id": getattr(f, "id", None),
|
|
"name": getattr(f, "name", ""),
|
|
"size": getattr(f, "size", 0),
|
|
"progress": getattr(f, "progress", 0)
|
|
})
|
|
return file_tree
|
|
except Exception as e:
|
|
print(f"Error fetching file tree for {torrent_hash}: {e}")
|
|
return []
|
|
|
|
def get_torrent_hash(source: str | bytes) -> str:
|
|
"""
|
|
Parses a local .torrent file or raw bytes and computes its info hash directly.
|
|
"""
|
|
try:
|
|
if isinstance(source, bytes):
|
|
torrent_data = bencodepy.decode(source)
|
|
else:
|
|
with open(source, "rb") as f:
|
|
torrent_data = bencodepy.decode(f.read())
|
|
|
|
# Info dictionary is under b"info"
|
|
info_data = torrent_data[b"info"]
|
|
info_encoded = bencodepy.encode(info_data)
|
|
|
|
# Calculate SHA1 hash of the bencoded info dictionary
|
|
return hashlib.sha1(info_encoded).hexdigest()
|
|
except Exception as e:
|
|
print(f"Could not parse torrent hash: {e}")
|
|
return ""
|
|
|
|
def rename_torrent_and_folder(qb_client: Client, torrent_hash: str, new_name: str) -> None:
|
|
"""
|
|
Renames the torrent display name and the top-level folder on disk to the given `new_name`.
|
|
"""
|
|
info = qb_client.torrents_info(hashes=torrent_hash)
|
|
if not info:
|
|
print(f"Torrent {torrent_hash} not found to rename.")
|
|
return
|
|
|
|
t_info = info[0]
|
|
old_name = t_info.name
|
|
|
|
print(f"Renaming torrent name from '{old_name}' to '{new_name}'")
|
|
try:
|
|
qb_client.torrents_rename(torrent_hash=torrent_hash, new_torrent_name=new_name)
|
|
except Exception as e:
|
|
print(f"Failed to rename torrent: {e}")
|
|
|
|
try:
|
|
files = qb_client.torrents_files(torrent_hash=torrent_hash)
|
|
if files:
|
|
root_dirs = set()
|
|
for f in files:
|
|
parts = getattr(f, "name", "").replace("\\", "/").split("/")
|
|
if len(parts) > 1:
|
|
root_dirs.add(parts[0])
|
|
else:
|
|
root_dirs.add("") # Single file at base
|
|
|
|
if len(root_dirs) == 1 and "" not in root_dirs:
|
|
old_folder = list(root_dirs)[0]
|
|
if old_folder != new_name:
|
|
print(f"Renaming top-level dir from '{old_folder}' to '{new_name}'")
|
|
qb_client.torrents_rename_folder(torrent_hash=torrent_hash, old_path=old_folder, new_path=new_name)
|
|
else:
|
|
print("Multiple root folders or single files at base detected. Skipping folder rename.")
|
|
except Exception as e:
|
|
print(f"Failed to rename folder: {e}")
|
|
|
|
import time
|
|
print("Waiting for rename to take effect...")
|
|
for _ in range(15): # wait up to 15 seconds
|
|
info = qb_client.torrents_info(hashes=torrent_hash)
|
|
if not info:
|
|
break
|
|
|
|
current_name = info[0].name
|
|
|
|
# Check files to see if root path matches
|
|
files = qb_client.torrents_files(torrent_hash=torrent_hash)
|
|
if not files:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
all_match_or_single = True
|
|
has_root_dir = all("/" in getattr(f, "name", "") or "\\" in getattr(f, "name", "") for f in files)
|
|
|
|
if has_root_dir:
|
|
if not all(getattr(f, "name", "").startswith(f"{new_name}/") or getattr(f, "name", "").startswith(f"{new_name}\\") for f in files):
|
|
all_match_or_single = False
|
|
|
|
if current_name == new_name and all_match_or_single:
|
|
print("Rename confirmed by qBittorrent.")
|
|
return
|
|
|
|
time.sleep(1)
|
|
|
|
print("Warning: Rename may not have fully propagated yet.")
|
|
|
|
def remove_tag_if_exists(qb_client: Client, torrent_hash: str, tag_to_remove: str) -> None:
|
|
"""
|
|
Checks if a tag exists on a torrent, and removes it if it does.
|
|
"""
|
|
info = qb_client.torrents_info(hashes=torrent_hash)
|
|
if not info:
|
|
return
|
|
|
|
t_info = info[0]
|
|
tags = getattr(t_info, "tags", "")
|
|
if tags:
|
|
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
|
|
if tag_to_remove in tag_list:
|
|
print(f"Removing existing tag '{tag_to_remove}' from torrent.")
|
|
qb_client.torrents_remove_tags(tags=tag_to_remove, torrent_hashes=torrent_hash)
|