21 Commits

Author SHA1 Message Date
azalea dbbb45d2fe [U] Use Q1 instead of min 2025-04-15 22:07:56 -04:00
azalea 8b51af821f [O] Use max min_dur * 8 as max_dur 2024-12-10 06:51:01 -05:00
azalea 1b7e347de0 [U] Update readme 2024-12-10 06:41:39 -05:00
azalea 304f499fbf [-] Drop 3.7-3.9 support, remove entrypoint 2024-12-10 06:39:08 -05:00
azalea 8caebbb36d [U] Bump version 2024-12-10 06:36:03 -05:00
azalea 884686a1cb [U] Update readme 2024-12-10 06:35:41 -05:00
azalea a3f475224b [U] Update readme 2024-12-10 06:33:57 -05:00
azalea 6952b160f1 [+] Badblocks UI 2024-12-10 06:31:16 -05:00
azalea 52fcbfc205 [F] Fix write file when parent dir doesn't exist 2024-11-28 01:26:15 -05:00
azalea 27e3f92186 [+] Pickle zst 2024-11-28 01:03:46 -05:00
azalea cd3051a1b1 [+] Json zst 2024-11-27 03:37:23 -05:00
azalea c6bb2b5207 [+] Json zst 2024-11-27 03:37:12 -05:00
azalea 179f9ac5a6 [+] Recursive get 2024-11-25 18:47:03 -05:00
azalea a732f31ae7 [U] Bump version 2024-11-16 18:54:20 -05:00
azalea b926d6253c [+] Safe function wrapper 2024-11-16 18:52:53 -05:00
Azalea Gui b9ed726caa [+] Badblocks tool 2023-12-02 21:23:58 -05:00
Azalea Gui 46ea72641f [F] Fix import 2023-11-06 03:52:52 -05:00
Azalea Gui 62929dd48a [F] Fix typo 2023-11-06 03:32:31 -05:00
Azalea Gui 25e319d898 [U] Update readme 2023-11-06 03:00:33 -05:00
Azalea Gui 6291d178d4 [+] Git utils 2023-11-06 03:00:05 -05:00
Azalea 87a46fcf28 [F] ipconfig.me is blocking proxy blacklist 2023-10-05 21:31:47 -04:00
12 changed files with 407 additions and 16 deletions
+11
View File
@@ -1,10 +1,21 @@
# HyPyUtils
HyDEV Utils for Python
`pip install hypy_utils`
## Modules
Some modules have extra requirements that are not installed along with hypy_utils. These are listed below:
| Module | Requirements |
|--------------------|--------------------------|
| `tqdm_utils` | tqdm |
| `downloader` | tqdm, requests |
| `scientific_utils` | numpy, numba, matplotlib |
| `git_utils` | dateutil |
## BadBlocks - HDD sector scanning for Linux
Usage: `python3 -m hypy_utils.badblocks -d /dev/sda`
![badblocks-2](docs/badblocks.png)
Binary file not shown.

After

Width:  |  Height:  |  Size: 832 KiB

BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 539 KiB

+35 -1
View File
@@ -1,14 +1,18 @@
from __future__ import annotations
__version__ = "1.0.19"
__version__ = "1.0.29"
import time
import logging
from typing import Callable
from .color_utils import *
from .serializer import *
log = logging.getLogger(__name__)
class Timer:
start: int
@@ -39,3 +43,33 @@ def run_time(func: Callable, *args, **kwargs):
_ = [func(*args, **kwargs) for _ in range(iter)]
ms = (time.time_ns() - start) / 1e6
print(f'RT {name:30} {ms:6.1f} ms')
def safe(func: Callable, on_error: Callable[[Exception], Any] = None) -> Callable:
"""
Wrapper for safely executing a function and returning the result of on_error if an exception occurs
If on_error is None, it will return None on error
Example Usage:
>>> safe(lambda x: 1 / x)(0)
None
>>> safe(lambda x: 1 / x)(2)
0.5
:param func: Function that needs safe execution
:param on_error: Function to execute when an error occurs
:return: Wrapped function
"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if on_error:
return on_error(e)
else:
log.exception(e)
return None
return wrapper
+1 -5
View File
@@ -1,9 +1,5 @@
from __future__ import annotations
def main():
print('🐱')
if __name__ == '__main__':
main()
print('🐱')
+82
View File
@@ -0,0 +1,82 @@
<!DOCTYPE html>
<html lang="html5">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>BadBlocks Scan Result for /dev/sda</title>
<script src="https://unpkg.com/petite-vue"></script>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body v-scope @vue:mounted="mounted" class="p-4 relative flex flex-col gap-3">
<div>
<h1 class="text-2xl font-bold mb-4">BadBlocks Scan Result for /dev/sda</h1>
<p>Scan started on {{ first.timestamp }} and ended on {{ last?.timestamp }}.</p>
<p>
Total blocks: {{ last.end_block }} blocks |
Block size: {{ d.block_size }} |
Total size: <span class="text-red-500">{{ (last.end_block * d.block_size / 1_000_000_000_000).toFixed(2) }} TB</span>
<span class="text-gray-400">= {{ (last.end_block * d.block_size / 1024 / 1024 / 1024 / 1024).toFixed(2) }} TiB</span></p>
<p><span class="text-red-500">Red</span> blocks indicate bad blocks or blocks that take too long (8x normal time) to scan. Hover over a block to see more information.</p>
<p>Made with ♥ by <a href="https://github.com/hykilpikonna" class="text-red-500 underline">Azalea</a> | GitHub @ <a href="https://github.com/hykilpikonna/HyPyUtils" class="text-red-500 underline">hykilpikonna/HyPyUtils</a></p>
</div>
<div class="flex flex-wrap gap-0.5">
<div v-for="(log, index) in d.logs" :key="index"
class="inline-block w-2 h-2"
:style="{backgroundColor: getBlockColor(log)}"
@mouseenter="showHoverInfo($event, log, index)" @mouseleave="hideHoverInfo"></div>
</div>
<!-- Tooltip for showing hover information -->
<div v-if="hover"
:style="{top: hover?.y + 'px', left: hover?.x + 'px'}"
class="absolute bg-gray-800 text-white text-sm rounded px-2 py-1 shadow-md pointer-events-none transition-opacity duration-150">
<p>Start: {{ hover?.l?.start_block?.toString(16) }}</p>
<p>End: {{ hover?.l?.end_block?.toString(16) }}</p>
<p>Duration: {{ hover?.l?.duration?.toFixed(2) }}</p>
</div>
</body>
<script>
PetiteVue.createApp({
d: { logs: [] }, // timestamp, duration, start_block, end_block, bad_blocks
max_dur: 0, min_dur: 0, hover: null, firs: null, last: null,
onInit() {
// Extract all durations and sort them
const durations = this.d.logs.map(l => l.duration).sort((a, b) => a - b);
// Compute Q1 index; you can choose to do an interpolation if you need higher accuracy
const q1Index = Math.floor(durations.length * 0.25);
// Use Q1 as our new "min"
this.min_dur = durations[q1Index];
this.max_dur = this.min_dur * 8;
console.log(`Q1 duration: ${this.min_dur}, Max duration: ${this.max_dur}`);
this.first = this.d.logs[0];
this.last = this.d.logs[this.d.logs.length - 1];
},
mounted() {
if (this.d.logs.length) return this.onInit() // For injecting data from server-side
fetch('http://localhost:8080/badblocks_log__dev_sda.json').then(resp => resp.json())
.then(data => { this.d = data; this.onInit() })
},
getBlockColor(log) {
if (log.bad_blocks.length) return 'red'
const ratio = 1 - ((log.duration - this.min_dur) / (this.max_dur - this.min_dur))
return `rgb(${Math.round(255 * (1 - ratio))}, ${Math.round(255 * ratio)}, 0)`
},
showHoverInfo(event, log, index) {
const rect = event.target.getBoundingClientRect();
this.hover = { l: log,
x: rect.left + window.scrollX + 10,
y: rect.top + window.scrollY - 30
}
},
hideHoverInfo() { this.hover = null }
}).mount()
</script>
</html>
+162
View File
@@ -0,0 +1,162 @@
import argparse
import datetime
import json
import os
import platform
from shutil import which
import signal
import subprocess
import time
from pathlib import Path
from hypy_utils import color
from hypy_utils.logging_utils import setup_logger
log = setup_logger()
speeds = []
def signal_handler(sig, frame):
global pending_stop
pending_stop = True
log.error("^C received, signaling for the main process to stop...")
log.warning("Please wait for the current block to finish scanning, then the program will exit.")
log.warning("If you want to stop immediately, press ^\\ (NOT RECOMMENDED)")
pending_stop = False
signal.signal(signal.SIGINT, signal_handler)
def to_gb(block: int):
return block * BLOCK_SIZE / (1024 * 1024 * 1024)
def disk_info() -> tuple[int, int]:
# Get the disk size in blocks
disk_size = int(subprocess.run(f"blockdev --getsize64 {DISK}", capture_output=True, text=True, shell=True).stdout) // BLOCK_SIZE
log.info(f"Disk size: {to_gb(disk_size):,.0f} GB, {disk_size:#x} blocks")
# Get the size of a logical sector (LDA)
lss = int(subprocess.run(f"blockdev --getss {DISK}", capture_output=True, text=True, shell=True).stdout)
pss = int(subprocess.run(f"blockdev --getpbsz {DISK}", capture_output=True, text=True, shell=True).stdout)
log.info(f"Logical sector size: {lss} bytes, physical sector size: {pss} bytes")
return disk_size, lss
def run_badblocks(start_block: int, end_block: int):
# Print block address in hex
log.debug(f"Scanning from {start_block:#x} ({to_gb(start_block):,.0f} GB) to {end_block:#x} ({to_gb(end_block):,.0f} GB)")
command = f"badblocks -b 4096 -v {DISK} {end_block} {start_block}"
duration = time.time()
result = subprocess.run(command, capture_output=True, text=True, shell=True, start_new_session=True)
duration = time.time() - duration
# stdout should be a list of bad blocks, parse it
bad_blocks = [int(r) for r in result.stdout.strip().split("\n") if r]
# Write the log as json
logf = json.loads(LOG_FILE.read_text())
logf["logs"].append({
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"duration": duration,
"start_block": start_block,
"end_block": end_block,
"bad_blocks": bad_blocks,
"stderr": result.stderr,
})
LOG_FILE.write_text(json.dumps(logf, indent=2))
# Print logs
if bad_blocks:
log.error(f"> Bad blocks found: ")
for block in bad_blocks:
# Pint in hex
log.error(f"> {block:#x} = LDA {block * BLOCK_SIZE // lss:#x} = {block * BLOCK_SIZE / (1024 * 1024 * 1024):,.0f} GB")
else:
log.debug(color(f"> Clean!"))
# Print summary (speed, progress, eta, etc.)
# The stored speed is in blocks per second
speed = (end_block - start_block) / duration
speeds.append(speed)
avg_spd = sum(speeds) / len(speeds)
progress = end_block / disk_size
# Calculate ETA
eta = (disk_size - end_block) / avg_spd
eta = str(datetime.timedelta(seconds=eta))[:-7]
# Convert speed to MB/s
speed *= BLOCK_SIZE / (1024 * 1024)
avg_spd *= BLOCK_SIZE / (1024 * 1024)
log.info(f"> {progress * 100:.2f}% | Cur {speed:.1f} MB/s | Remain {eta} | "
f"Avg {avg_spd:.1f} MB/s")
if __name__ == "__main__":
# Take in disk and block size as optional arguments
parser = argparse.ArgumentParser("Bad block detection utility")
parser.add_argument("command", type=str, help="Command to run", choices=["scan", "plot"])
parser.add_argument("--disk", "-d", type=str, help="Disk to scan")
parser.add_argument("--block-size", "-b", type=int, default=4096, help="Block size in bytes")
parser.add_argument("--start", "-s", type=int, help="Start block")
parser.add_argument("--end", "-e", type=int, help="End block")
parser.add_argument("--rescan", action="store_true", help="Rescan the whole disk")
args = parser.parse_args()
DISK = args.disk
BLOCK_SIZE = args.block_size
START = args.start
END = args.end
try:
assert platform.system() != "Windows", "Windows is not supported, go use DiskGenius or something"
assert which("badblocks"), "badblocks command not found, please install e2fsprogs"
assert which("blockdev"), "blockdev command not found, please install util-linux"
assert DISK and Path(DISK).exists(), f"Disk {DISK} does not exist"
assert BLOCK_SIZE % 512 == 0, "Block size must be a multiple of 512"
assert os.geteuid() == 0, "You need to run as root to access the disk"
except AssertionError as e:
log.error(e.args[0])
exit(1)
LOG_FILE = Path(__file__).parent / f"badblocks_log_{DISK.replace('/', '_')}.json"
if not LOG_FILE.exists():
LOG_FILE.write_text(json.dumps({"logs": [], "block_size": BLOCK_SIZE}, indent=2))
elif not args.rescan:
# Check if the block size matches
block_size = json.loads(LOG_FILE.read_text())["block_size"]
if block_size != BLOCK_SIZE:
raise ValueError(f"Block size mismatch: {block_size} != {BLOCK_SIZE}")
# Resume from the last run
logs = json.loads(LOG_FILE.read_text())["logs"]
if logs:
last_log = logs[-1]
START = last_log["end_block"]
log.info(f"Resuming from {START:#x}")
gb_approx = 1024 * 1024 * 1024 // BLOCK_SIZE
disk_size, lss = disk_info()
if args.command == "scan":
for start in range(START or 0, END or disk_size, gb_approx):
end = min(start + gb_approx, disk_size)
run_badblocks(start, end)
if pending_stop:
break
# Plot
ouf = Path(f"badblocks{DISK.replace('/', '_')}.html")
html = ((Path(__file__).parent / 'badblocks.html').read_text()
.replace("d: { logs: [] }", f"d: {LOG_FILE.read_text()}")
.replace("/dev/sda", DISK)
)
ouf.write_text(html)
log.info(f"Results saved to {ouf}.")
log.warning(f"You can open the html {ouf.absolute().as_uri()} in your browser. I can't open it for you because this script is running in sudo.")
+16
View File
@@ -84,3 +84,19 @@ def deep_dict(o: object, exclude: set | None):
if isinstance(o, list):
return [deep_dict(v, exclude) for v in o]
return o
def get_rec(cd: dict, key: str):
"""
:param cd: Dictionary
:param key: Recursive key in the format of keya.keyb.keyc...
"""
if '.' not in key:
return cd.get(key)
ks = key.split('.')
while len(ks) > 0:
cd = cd.get(ks.pop(0))
if cd is None:
break
return cd
+45
View File
@@ -0,0 +1,45 @@
import datetime
import shlex
from pathlib import Path
from subprocess import check_output
from typing import NamedTuple
import dateutil.parser
class ExtractedCommit(NamedTuple):
sha: str
author: str
email: str
time: str
message: str
file_names: list[str]
def get_time(self) -> datetime:
return dateutil.parser.isoparse(self.time)
def git_log(path: Path, fail_silently: bool = False) -> list[ExtractedCommit]:
"""
Call and parse git log. This function requires that git>=2.37.1 is installed on your system.
:param path: Path of git repository
:param fail_silently: If true, ignore errors. If false, raise exception when errors occur.
:return: List of commits
"""
# check_call(shlex.split('git config diff.renames 0'))
cmd = f"git -c 'diff.renamelimit=0' -c 'diff.renames=0' -C '{path.absolute()}' log --name-status --diff-filter=AMD --pretty=format:'START_COMMIT_QwQ %H%n%aN%n%aE%n%aI%n%s%n'"
log = check_output(shlex.split(cmd)).decode('utf-8', 'ignore')
def extract_commit(block: str) -> ExtractedCommit:
try:
lines = block.split('\n')
sha, author, email, date, message = lines + [""] if len(lines) == 4 else lines[:5]
files = [f.replace('\t', '/') for f in lines[6:]]
return ExtractedCommit(sha, author, email, date, message, files)
except Exception as e:
print(f'========== Commit Extract Error {e} ==========\n{block}\n==========')
if not fail_silently:
raise e
return [extract_commit(c.strip()) for c in log.split('START_COMMIT_QwQ') if c]
+1 -1
View File
@@ -2,7 +2,7 @@ import requests
def setup_proxy(session: requests.Session, addr: str = 'socks5://localhost:9050', verbose: bool = True):
url = 'https://ifconfig.me/ip'
url = 'https://ip.me'
# Setup proxy
ip = session.get(url).text.strip()
+51
View File
@@ -0,0 +1,51 @@
import pickle
from pathlib import Path
import zstandard as zstd
import orjson
from . import write
zstd_d = zstd.ZstdDecompressor()
zstd_c = zstd.ZstdCompressor(level=5, write_checksum=True, threads=-1)
def load_json_zst(file_path: str | Path) -> dict | list:
"""
Load a .json.zst file and return its parsed content.
Parameters:
file_path (str): The path to the .json.zst file.
Returns:
dict or list: The parsed JSON content.
"""
with Path(file_path).open('rb') as f:
return orjson.loads(zstd_d.stream_reader(f).read())
def write_json_zst(file_path: str | Path, data: dict | list, **kwargs):
"""
Dump data to a .json.zst file.
Parameters:
file_path (str): The path to the .json.zst file.
data (dict or list): The data to dump.
"""
write(file_path, zstd_c.compress(orjson.dumps(data, **kwargs)))
def load_pickle_zst(file_path: str | Path):
with Path(file_path).open('rb') as f:
return pickle.loads(zstd_d.stream_reader(f).read())
def write_pickle_zst(file_path: str | Path, data):
write(file_path, zstd_c.compress(pickle.dumps(data)))
if __name__ == '__main__':
write_pickle_zst('test.pickle.zst', {'a': 1, 'b': 2})
assert load_pickle_zst('test.pickle.zst') == {'a': 1, 'b': 2}
write_json_zst('test.json.zst', {'a': 1, 'b': 2})
assert load_json_zst('test.json.zst') == {'a': 1, 'b': 2}
+3 -9
View File
@@ -22,18 +22,12 @@ setup(
classifiers=[
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
],
packages=find_packages(exclude=("tests",)),
include_package_data=True,
install_requires=[],
entry_points={
"console_scripts": [
"hypy_utils=hypy_utils.__main__:main",
]
},
install_requires=[]
)