import argparse import datetime import json import os import platform from shutil import which import signal import subprocess import time from pathlib import Path from hypy_utils import color from hypy_utils.logging_utils import setup_logger log = setup_logger() speeds = [] def signal_handler(sig, frame): global pending_stop pending_stop = True log.error("^C received, signaling for the main process to stop...") log.warning("Please wait for the current block to finish scanning, then the program will exit.") log.warning("If you want to stop immediately, press ^\\ (NOT RECOMMENDED)") pending_stop = False signal.signal(signal.SIGINT, signal_handler) def to_gb(block: int): return block * BLOCK_SIZE / (1024 * 1024 * 1024) def disk_info() -> tuple[int, int]: # Get the disk size in blocks disk_size = int(subprocess.run(f"blockdev --getsize64 {DISK}", capture_output=True, text=True, shell=True).stdout) // BLOCK_SIZE log.info(f"Disk size: {to_gb(disk_size):,.0f} GB, {disk_size:#x} blocks") # Get the size of a logical sector (LDA) lss = int(subprocess.run(f"blockdev --getss {DISK}", capture_output=True, text=True, shell=True).stdout) pss = int(subprocess.run(f"blockdev --getpbsz {DISK}", capture_output=True, text=True, shell=True).stdout) log.info(f"Logical sector size: {lss} bytes, physical sector size: {pss} bytes") return disk_size, lss def run_badblocks(start_block: int, end_block: int): # Print block address in hex log.debug(f"Scanning from {start_block:#x} ({to_gb(start_block):,.0f} GB) to {end_block:#x} ({to_gb(end_block):,.0f} GB)") command = f"badblocks -b 4096 -v {DISK} {end_block} {start_block}" duration = time.time() result = subprocess.run(command, capture_output=True, text=True, shell=True, start_new_session=True) duration = time.time() - duration # stdout should be a list of bad blocks, parse it bad_blocks = [int(r) for r in result.stdout.strip().split("\n") if r] # Write the log as json logf = json.loads(LOG_FILE.read_text()) logf["logs"].append({ "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "duration": duration, "start_block": start_block, "end_block": end_block, "bad_blocks": bad_blocks, "stderr": result.stderr, }) LOG_FILE.write_text(json.dumps(logf, indent=2)) # Print logs if bad_blocks: log.error(f"> Bad blocks found: ") for block in bad_blocks: # Pint in hex log.error(f"> {block:#x} = LDA {block * BLOCK_SIZE // lss:#x} = {block * BLOCK_SIZE / (1024 * 1024 * 1024):,.0f} GB") else: log.debug(color(f"> Clean!")) # Print summary (speed, progress, eta, etc.) # The stored speed is in blocks per second speed = (end_block - start_block) / duration speeds.append(speed) avg_spd = sum(speeds) / len(speeds) progress = end_block / disk_size # Calculate ETA eta = (disk_size - end_block) / avg_spd eta = str(datetime.timedelta(seconds=eta))[:-7] # Convert speed to MB/s speed *= BLOCK_SIZE / (1024 * 1024) avg_spd *= BLOCK_SIZE / (1024 * 1024) log.info(f"> {progress * 100:.2f}% | Cur {speed:.1f} MB/s | Remain {eta} | " f"Avg {avg_spd:.1f} MB/s") if __name__ == "__main__": # Take in disk and block size as optional arguments parser = argparse.ArgumentParser("Bad block detection utility") parser.add_argument("command", type=str, help="Command to run", choices=["scan", "plot"]) parser.add_argument("--disk", "-d", type=str, help="Disk to scan") parser.add_argument("--block-size", "-b", type=int, default=4096, help="Block size in bytes") parser.add_argument("--start", "-s", type=int, help="Start block") parser.add_argument("--end", "-e", type=int, help="End block") parser.add_argument("--rescan", action="store_true", help="Rescan the whole disk") args = parser.parse_args() DISK = args.disk BLOCK_SIZE = args.block_size START = args.start END = args.end try: assert platform.system() != "Windows", "Windows is not supported, go use DiskGenius or something" assert which("badblocks"), "badblocks command not found, please install e2fsprogs" assert which("blockdev"), "blockdev command not found, please install util-linux" assert DISK and Path(DISK).exists(), f"Disk {DISK} does not exist" assert BLOCK_SIZE % 512 == 0, "Block size must be a multiple of 512" assert os.geteuid() == 0, "You need to run as root to access the disk" except AssertionError as e: log.error(e.args[0]) exit(1) LOG_FILE = Path(__file__).parent / f"badblocks_log_{DISK.replace('/', '_')}.json" if not LOG_FILE.exists(): LOG_FILE.write_text(json.dumps({"logs": [], "block_size": BLOCK_SIZE}, indent=2)) elif not args.rescan: # Check if the block size matches block_size = json.loads(LOG_FILE.read_text())["block_size"] if block_size != BLOCK_SIZE: raise ValueError(f"Block size mismatch: {block_size} != {BLOCK_SIZE}") # Resume from the last run logs = json.loads(LOG_FILE.read_text())["logs"] if logs: last_log = logs[-1] START = last_log["end_block"] log.info(f"Resuming from {START:#x}") gb_approx = 1024 * 1024 * 1024 // BLOCK_SIZE disk_size, lss = disk_info() if args.command == "scan": for start in range(START or 0, END or disk_size, gb_approx): end = min(start + gb_approx, disk_size) run_badblocks(start, end) if pending_stop: break # Plot ouf = Path(f"badblocks{DISK.replace('/', '_')}.html") html = ((Path(__file__).parent / 'badblocks.html').read_text() .replace("d: { logs: [] }", f"d: {LOG_FILE.read_text()}") .replace("/dev/sda", DISK) ) ouf.write_text(html) log.info(f"Results saved to {ouf}.") log.warning(f"You can open the html {ouf.absolute().as_uri()} in your browser. I can't open it for you because this script is running in sudo.")