52 Commits

Author SHA1 Message Date
azalea dbbb45d2fe [U] Use Q1 instead of min 2025-04-15 22:07:56 -04:00
azalea 8b51af821f [O] Use max min_dur * 8 as max_dur 2024-12-10 06:51:01 -05:00
azalea 1b7e347de0 [U] Update readme 2024-12-10 06:41:39 -05:00
azalea 304f499fbf [-] Drop 3.7-3.9 support, remove entrypoint 2024-12-10 06:39:08 -05:00
azalea 8caebbb36d [U] Bump version 2024-12-10 06:36:03 -05:00
azalea 884686a1cb [U] Update readme 2024-12-10 06:35:41 -05:00
azalea a3f475224b [U] Update readme 2024-12-10 06:33:57 -05:00
azalea 6952b160f1 [+] Badblocks UI 2024-12-10 06:31:16 -05:00
azalea 52fcbfc205 [F] Fix write file when parent dir doesn't exist 2024-11-28 01:26:15 -05:00
azalea 27e3f92186 [+] Pickle zst 2024-11-28 01:03:46 -05:00
azalea cd3051a1b1 [+] Json zst 2024-11-27 03:37:23 -05:00
azalea c6bb2b5207 [+] Json zst 2024-11-27 03:37:12 -05:00
azalea 179f9ac5a6 [+] Recursive get 2024-11-25 18:47:03 -05:00
azalea a732f31ae7 [U] Bump version 2024-11-16 18:54:20 -05:00
azalea b926d6253c [+] Safe function wrapper 2024-11-16 18:52:53 -05:00
Azalea Gui b9ed726caa [+] Badblocks tool 2023-12-02 21:23:58 -05:00
Azalea Gui 46ea72641f [F] Fix import 2023-11-06 03:52:52 -05:00
Azalea Gui 62929dd48a [F] Fix typo 2023-11-06 03:32:31 -05:00
Azalea Gui 25e319d898 [U] Update readme 2023-11-06 03:00:33 -05:00
Azalea Gui 6291d178d4 [+] Git utils 2023-11-06 03:00:05 -05:00
Azalea 87a46fcf28 [F] ipconfig.me is blocking proxy blacklist 2023-10-05 21:31:47 -04:00
azalea df16f90a8f [U] Release 1.0.19 2023-07-28 20:52:27 -07:00
azalea 25aecabd34 [+] SafeNamespace 2023-07-28 20:49:51 -07:00
Azalea Gui 332a63479e [U] Release 1.0.18 2023-03-09 02:42:36 -05:00
Azalea Gui b748a217a0 [+] Logging utils 2023-03-09 02:42:03 -05:00
Azalea Gui afaef06f40 [+] Add setup_proxy 2023-01-29 23:54:53 -05:00
Azalea Gui 1948ff4a9c [O] Allow disabling progress bar during download 2023-01-13 05:48:43 -05:00
Hykilpikonna 80bf1da83d [U] Release 1.0.17 2022-12-19 01:42:28 -05:00
Hykilpikonna 6a60712d8c [+] deep_dict 2022-12-19 01:40:36 -05:00
Hykilpikonna 0530d41f42 [+] dict recursive remove 2022-12-19 01:38:52 -05:00
Hykilpikonna f6aa847368 [+] File name escaping 2022-12-19 01:12:08 -05:00
Hykilpikonna cb6aff290d [O] json support more types 2022-12-19 01:10:31 -05:00
Hykilpikonna 1325224fd8 [O] Support serializing Path, custom class, and byte arrays 2022-12-18 22:08:54 -05:00
Hykilpikonna 04f987cab8 [F] Fix download rate inverting 2022-12-18 21:59:50 -05:00
Hykilpikonna 156562f5a3 [U] Release 1.0.16 2022-10-29 19:07:13 -04:00
Hykilpikonna 26d756e628 [F] Fix error during download if content-length is not present 2022-10-29 19:06:56 -04:00
Azalea e0b2ef63b7 [U] Release 1.0.15 2022-10-29 17:23:50 -04:00
Azalea 374aedabb6 [F] Fix downloader 2022-10-29 17:08:23 -04:00
Azalea c28ca20edc [U] Release 1.0.14 2022-10-29 17:02:40 -04:00
Azalea 2480f4e690 [+] Downloader 2022-10-29 17:02:20 -04:00
Azalea (on HyDEV-Daisy) de3a30ef34 [U] 1.0.13 2022-08-26 00:35:06 -04:00
Azalea (on HyDEV-Daisy) 7d419b375b [U] Modular structure 2022-08-26 00:31:44 -04:00
Azalea (on HyDEV-Daisy) 376ecaa26e [U] Release 1.0.11 2022-08-25 23:21:31 -04:00
Azalea (on HyDEV-Daisy) f990731261 [+] Add stddev to scientific utils 2022-08-25 23:16:48 -04:00
Azalea (on HyDEV-Daisy) fb57ec06ae [F] Fix bugs in tqdm utils 2022-08-25 23:12:50 -04:00
Hykilpikonna b76a624b4f [+] Add simple namespace support 2022-08-15 20:05:38 -04:00
Hykilpikonna 7f56d94fe8 [F] Fix tqdm utils 2022-08-13 18:30:45 -04:00
Hykilpikonna 8d68f22eaa [O] Use doctests 2022-08-13 17:51:19 -04:00
Hykilpikonna e29d0f2c00 [+] substr_between function 2022-08-13 17:50:02 -04:00
Hykilpikonna 3b72889785 [+] Add nlp utils 2022-07-25 13:16:46 -04:00
Azalea (on HyDEV-Daisy) 6911cad00e [F] Fix imports 2022-07-03 02:24:31 -04:00
Azalea (on HyDEV-Daisy) 2ac83c5808 [+] tqdm and scientific utils 2022-07-03 02:17:34 -04:00
21 changed files with 1082 additions and 196 deletions
+2
View File
@@ -116,3 +116,5 @@ dmypy.json
# Custom
.idea
HyPyUtils.iml
.DS_Store
._*
+19
View File
@@ -1,2 +1,21 @@
# HyPyUtils
HyDEV Utils for Python
`pip install hypy_utils`
## Modules
Some modules have extra requirements that are not installed along with hypy_utils. These are listed below:
| Module | Requirements |
|--------------------|--------------------------|
| `tqdm_utils` | tqdm |
| `downloader` | tqdm, requests |
| `scientific_utils` | numpy, numba, matplotlib |
| `git_utils` | dateutil |
## BadBlocks - HDD sector scanning for Linux
Usage: `python3 -m hypy_utils.badblocks -d /dev/sda`
![badblocks-2](docs/badblocks.png)
Binary file not shown.

After

Width:  |  Height:  |  Size: 832 KiB

BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 539 KiB

+50 -179
View File
@@ -1,188 +1,16 @@
from __future__ import annotations
__version__ = "1.0.7"
__version__ = "1.0.29"
import dataclasses
import hashlib
import json
import time
from datetime import datetime, date
from pathlib import Path
from typing import Union
import logging
from typing import Callable
from .color_utils import *
from .serializer import *
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
def parse_date_time(iso: str) -> datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Support encoding datetime
if isinstance(o, (datetime, date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
return super().default(o)
def json_stringify(obj: object, indent: Union[int, None] = None) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param indent: Indent size or none
:return: Json strings
"""
return json.dumps(obj, indent=indent, cls=EnhancedJSONEncoder, ensure_ascii=False)
def write(file: Union[str, Path], text: str) -> None:
"""
Write text to a file
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:param text: Text
:return: None
"""
file = Path(file)
file.parent.mkdir(parents=True, exist_ok=True)
with file.open('w', encoding='utf-8') as f:
f.write(text)
def read(file: Union[str, Path]) -> str:
"""
Read file content
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:return: None
"""
return file.read_text('utf-8')
def md5(file: Union[str, Path]) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
log = logging.getLogger(__name__)
class Timer:
@@ -202,3 +30,46 @@ class Timer:
def reset(self):
self.start = time.time_ns()
def mem(var: str):
print(f'Memory usage for {var}: {eval(f"sys.getsizeof({var})") / 1024:.1f}KB')
def run_time(func: Callable, *args, **kwargs):
name = getattr(func, '__name__', 'function')
start = time.time_ns()
iter = kwargs.pop('iter', 10)
_ = [func(*args, **kwargs) for _ in range(iter)]
ms = (time.time_ns() - start) / 1e6
print(f'RT {name:30} {ms:6.1f} ms')
def safe(func: Callable, on_error: Callable[[Exception], Any] = None) -> Callable:
"""
Wrapper for safely executing a function and returning the result of on_error if an exception occurs
If on_error is None, it will return None on error
Example Usage:
>>> safe(lambda x: 1 / x)(0)
None
>>> safe(lambda x: 1 / x)(2)
0.5
:param func: Function that needs safe execution
:param on_error: Function to execute when an error occurs
:return: Wrapped function
"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if on_error:
return on_error(e)
else:
log.exception(e)
return None
return wrapper
+1 -5
View File
@@ -1,9 +1,5 @@
from __future__ import annotations
def main():
print('🐱')
if __name__ == '__main__':
main()
print('🐱')
+82
View File
@@ -0,0 +1,82 @@
<!DOCTYPE html>
<html lang="html5">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>BadBlocks Scan Result for /dev/sda</title>
<script src="https://unpkg.com/petite-vue"></script>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body v-scope @vue:mounted="mounted" class="p-4 relative flex flex-col gap-3">
<div>
<h1 class="text-2xl font-bold mb-4">BadBlocks Scan Result for /dev/sda</h1>
<p>Scan started on {{ first.timestamp }} and ended on {{ last?.timestamp }}.</p>
<p>
Total blocks: {{ last.end_block }} blocks |
Block size: {{ d.block_size }} |
Total size: <span class="text-red-500">{{ (last.end_block * d.block_size / 1_000_000_000_000).toFixed(2) }} TB</span>
<span class="text-gray-400">= {{ (last.end_block * d.block_size / 1024 / 1024 / 1024 / 1024).toFixed(2) }} TiB</span></p>
<p><span class="text-red-500">Red</span> blocks indicate bad blocks or blocks that take too long (8x normal time) to scan. Hover over a block to see more information.</p>
<p>Made with ♥ by <a href="https://github.com/hykilpikonna" class="text-red-500 underline">Azalea</a> | GitHub @ <a href="https://github.com/hykilpikonna/HyPyUtils" class="text-red-500 underline">hykilpikonna/HyPyUtils</a></p>
</div>
<div class="flex flex-wrap gap-0.5">
<div v-for="(log, index) in d.logs" :key="index"
class="inline-block w-2 h-2"
:style="{backgroundColor: getBlockColor(log)}"
@mouseenter="showHoverInfo($event, log, index)" @mouseleave="hideHoverInfo"></div>
</div>
<!-- Tooltip for showing hover information -->
<div v-if="hover"
:style="{top: hover?.y + 'px', left: hover?.x + 'px'}"
class="absolute bg-gray-800 text-white text-sm rounded px-2 py-1 shadow-md pointer-events-none transition-opacity duration-150">
<p>Start: {{ hover?.l?.start_block?.toString(16) }}</p>
<p>End: {{ hover?.l?.end_block?.toString(16) }}</p>
<p>Duration: {{ hover?.l?.duration?.toFixed(2) }}</p>
</div>
</body>
<script>
PetiteVue.createApp({
d: { logs: [] }, // timestamp, duration, start_block, end_block, bad_blocks
max_dur: 0, min_dur: 0, hover: null, firs: null, last: null,
onInit() {
// Extract all durations and sort them
const durations = this.d.logs.map(l => l.duration).sort((a, b) => a - b);
// Compute Q1 index; you can choose to do an interpolation if you need higher accuracy
const q1Index = Math.floor(durations.length * 0.25);
// Use Q1 as our new "min"
this.min_dur = durations[q1Index];
this.max_dur = this.min_dur * 8;
console.log(`Q1 duration: ${this.min_dur}, Max duration: ${this.max_dur}`);
this.first = this.d.logs[0];
this.last = this.d.logs[this.d.logs.length - 1];
},
mounted() {
if (this.d.logs.length) return this.onInit() // For injecting data from server-side
fetch('http://localhost:8080/badblocks_log__dev_sda.json').then(resp => resp.json())
.then(data => { this.d = data; this.onInit() })
},
getBlockColor(log) {
if (log.bad_blocks.length) return 'red'
const ratio = 1 - ((log.duration - this.min_dur) / (this.max_dur - this.min_dur))
return `rgb(${Math.round(255 * (1 - ratio))}, ${Math.round(255 * ratio)}, 0)`
},
showHoverInfo(event, log, index) {
const rect = event.target.getBoundingClientRect();
this.hover = { l: log,
x: rect.left + window.scrollX + 10,
y: rect.top + window.scrollY - 30
}
},
hideHoverInfo() { this.hover = null }
}).mount()
</script>
</html>
+162
View File
@@ -0,0 +1,162 @@
import argparse
import datetime
import json
import os
import platform
from shutil import which
import signal
import subprocess
import time
from pathlib import Path
from hypy_utils import color
from hypy_utils.logging_utils import setup_logger
log = setup_logger()
speeds = []
def signal_handler(sig, frame):
global pending_stop
pending_stop = True
log.error("^C received, signaling for the main process to stop...")
log.warning("Please wait for the current block to finish scanning, then the program will exit.")
log.warning("If you want to stop immediately, press ^\\ (NOT RECOMMENDED)")
pending_stop = False
signal.signal(signal.SIGINT, signal_handler)
def to_gb(block: int):
return block * BLOCK_SIZE / (1024 * 1024 * 1024)
def disk_info() -> tuple[int, int]:
# Get the disk size in blocks
disk_size = int(subprocess.run(f"blockdev --getsize64 {DISK}", capture_output=True, text=True, shell=True).stdout) // BLOCK_SIZE
log.info(f"Disk size: {to_gb(disk_size):,.0f} GB, {disk_size:#x} blocks")
# Get the size of a logical sector (LDA)
lss = int(subprocess.run(f"blockdev --getss {DISK}", capture_output=True, text=True, shell=True).stdout)
pss = int(subprocess.run(f"blockdev --getpbsz {DISK}", capture_output=True, text=True, shell=True).stdout)
log.info(f"Logical sector size: {lss} bytes, physical sector size: {pss} bytes")
return disk_size, lss
def run_badblocks(start_block: int, end_block: int):
# Print block address in hex
log.debug(f"Scanning from {start_block:#x} ({to_gb(start_block):,.0f} GB) to {end_block:#x} ({to_gb(end_block):,.0f} GB)")
command = f"badblocks -b 4096 -v {DISK} {end_block} {start_block}"
duration = time.time()
result = subprocess.run(command, capture_output=True, text=True, shell=True, start_new_session=True)
duration = time.time() - duration
# stdout should be a list of bad blocks, parse it
bad_blocks = [int(r) for r in result.stdout.strip().split("\n") if r]
# Write the log as json
logf = json.loads(LOG_FILE.read_text())
logf["logs"].append({
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"duration": duration,
"start_block": start_block,
"end_block": end_block,
"bad_blocks": bad_blocks,
"stderr": result.stderr,
})
LOG_FILE.write_text(json.dumps(logf, indent=2))
# Print logs
if bad_blocks:
log.error(f"> Bad blocks found: ")
for block in bad_blocks:
# Pint in hex
log.error(f"> {block:#x} = LDA {block * BLOCK_SIZE // lss:#x} = {block * BLOCK_SIZE / (1024 * 1024 * 1024):,.0f} GB")
else:
log.debug(color(f"> Clean!"))
# Print summary (speed, progress, eta, etc.)
# The stored speed is in blocks per second
speed = (end_block - start_block) / duration
speeds.append(speed)
avg_spd = sum(speeds) / len(speeds)
progress = end_block / disk_size
# Calculate ETA
eta = (disk_size - end_block) / avg_spd
eta = str(datetime.timedelta(seconds=eta))[:-7]
# Convert speed to MB/s
speed *= BLOCK_SIZE / (1024 * 1024)
avg_spd *= BLOCK_SIZE / (1024 * 1024)
log.info(f"> {progress * 100:.2f}% | Cur {speed:.1f} MB/s | Remain {eta} | "
f"Avg {avg_spd:.1f} MB/s")
if __name__ == "__main__":
# Take in disk and block size as optional arguments
parser = argparse.ArgumentParser("Bad block detection utility")
parser.add_argument("command", type=str, help="Command to run", choices=["scan", "plot"])
parser.add_argument("--disk", "-d", type=str, help="Disk to scan")
parser.add_argument("--block-size", "-b", type=int, default=4096, help="Block size in bytes")
parser.add_argument("--start", "-s", type=int, help="Start block")
parser.add_argument("--end", "-e", type=int, help="End block")
parser.add_argument("--rescan", action="store_true", help="Rescan the whole disk")
args = parser.parse_args()
DISK = args.disk
BLOCK_SIZE = args.block_size
START = args.start
END = args.end
try:
assert platform.system() != "Windows", "Windows is not supported, go use DiskGenius or something"
assert which("badblocks"), "badblocks command not found, please install e2fsprogs"
assert which("blockdev"), "blockdev command not found, please install util-linux"
assert DISK and Path(DISK).exists(), f"Disk {DISK} does not exist"
assert BLOCK_SIZE % 512 == 0, "Block size must be a multiple of 512"
assert os.geteuid() == 0, "You need to run as root to access the disk"
except AssertionError as e:
log.error(e.args[0])
exit(1)
LOG_FILE = Path(__file__).parent / f"badblocks_log_{DISK.replace('/', '_')}.json"
if not LOG_FILE.exists():
LOG_FILE.write_text(json.dumps({"logs": [], "block_size": BLOCK_SIZE}, indent=2))
elif not args.rescan:
# Check if the block size matches
block_size = json.loads(LOG_FILE.read_text())["block_size"]
if block_size != BLOCK_SIZE:
raise ValueError(f"Block size mismatch: {block_size} != {BLOCK_SIZE}")
# Resume from the last run
logs = json.loads(LOG_FILE.read_text())["logs"]
if logs:
last_log = logs[-1]
START = last_log["end_block"]
log.info(f"Resuming from {START:#x}")
gb_approx = 1024 * 1024 * 1024 // BLOCK_SIZE
disk_size, lss = disk_info()
if args.command == "scan":
for start in range(START or 0, END or disk_size, gb_approx):
end = min(start + gb_approx, disk_size)
run_badblocks(start, end)
if pending_stop:
break
# Plot
ouf = Path(f"badblocks{DISK.replace('/', '_')}.html")
html = ((Path(__file__).parent / 'badblocks.html').read_text()
.replace("d: { logs: [] }", f"d: {LOG_FILE.read_text()}")
.replace("/dev/sda", DISK)
)
ouf.write_text(html)
log.info(f"Results saved to {ouf}.")
log.warning(f"You can open the html {ouf.absolute().as_uri()} in your browser. I can't open it for you because this script is running in sudo.")
+58
View File
@@ -0,0 +1,58 @@
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
+102
View File
@@ -0,0 +1,102 @@
from hypy_utils import infer
def is_non_empty(o):
return not hasattr(o, '__len__') or len(o) > 0
def remove_values(d: dict | list, vals: list, preserve_list: bool = False) -> dict | list:
"""
Recursively remove values from a dict
:param d: Dict
:param vals: Values to remove
:param preserve_list: Whether to ignore list elements
:return: Dict without specific values
"""
if isinstance(d, list):
d = [remove_values(i, vals, preserve_list) for i in d if preserve_list or i not in vals]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_values(v, vals, preserve_list) for k, v in d.items() if v not in vals}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def remove_nones(d: dict | list, preserve_list: bool = False) -> dict:
"""
Recursively remove nones from a dict
>>> remove_nones({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]})
{'a': {'c': 1}, 'd': [1]}
:param d: Dict
:param preserve_list: Whether to ignore list elements
:return: Dict without nones
"""
return remove_values(d, [None], preserve_list=preserve_list)
def remove_keys(d: dict | list, keys: set) -> dict | list:
"""
Recursively remove keys
>>> remove_keys({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]}, {'b'})
{'a': {'c': 1}, 'c': {'a': None}, 'd': [None, 1]}
:param d: The dictionary that you want to remove keys from
:param keys: Set of keys you want to remove
:return: Dict without specific keys
"""
if isinstance(d, list):
d = [remove_keys(i, keys) for i in d]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_keys(v, keys) for k, v in d.items() if k not in keys}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def deep_dict(o: object, exclude: set | None):
"""
Recursively convert an object into a dictionary
:param o: Object
:param exclude: Keys to exclude
:return: Deep dictionary of the object's variables
"""
exclude = exclude or {}
infer_result = infer(o)
if infer_result:
return infer_result
if hasattr(o, '__dict__'):
return deep_dict(dict(vars(o)), exclude)
if isinstance(o, dict):
return {k: deep_dict(v, exclude) for k, v in o.items() if k not in exclude}
if isinstance(o, list):
return [deep_dict(v, exclude) for v in o]
return o
def get_rec(cd: dict, key: str):
"""
:param cd: Dictionary
:param key: Recursive key in the format of keya.keyb.keyc...
"""
if '.' not in key:
return cd.get(key)
ks = key.split('.')
while len(ks) > 0:
cd = cd.get(ks.pop(0))
if cd is None:
break
return cd
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
import os
from pathlib import Path
import requests
import tqdm
def download_file(url: str, file: str | Path, progress: bool = True):
"""
Helper method handling downloading large files from `url` to `filename`.
Returns a pointer to `filename`.
https://stackoverflow.com/a/42071418/7346633
"""
file = Path(file)
if file.is_file():
return file
chunk_size = 1024
try:
term_len = os.get_terminal_size().columns
bar_len = int(term_len * 0.4)
except Exception:
term_len = 60
bar_len = 20
tqdm_args = dict()
r = requests.get(url, stream=True)
if 'content-length' in r.headers:
tqdm_args['total'] = int(r.headers['content-length']) / 1024 / 1024
with open(file, 'wb') as f:
pbar = None
if progress:
pbar = tqdm.tqdm(unit=" MB", ncols=term_len,
bar_format='{desc} {rate_noinv_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #',
desc=file.name[:bar_len].ljust(bar_len), **tqdm_args)
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
if pbar:
pbar.update(len(chunk) / 1024 / 1024)
f.write(chunk)
return file
+37
View File
@@ -0,0 +1,37 @@
import base64
FILENAME_BLACKLIST = [
# Unix and Windows
"/",
# Windows only
"<", ">", ":", '"', "\\", "|", "?", "*", "\0",
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
# Just for extra safety
"~"
]
FILENAME_REPLACE = {c: f"%{base64.b64encode(c.encode()).decode().replace('=', '')}" for c in FILENAME_BLACKLIST}
def escape_filename(fn: str) -> str:
fn = fn.replace("%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(c, r)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%%")
return fn
def unescape_filename(fn: str) -> str:
fn = fn.replace("%%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(r, c)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%")
return fn
+45
View File
@@ -0,0 +1,45 @@
import datetime
import shlex
from pathlib import Path
from subprocess import check_output
from typing import NamedTuple
import dateutil.parser
class ExtractedCommit(NamedTuple):
sha: str
author: str
email: str
time: str
message: str
file_names: list[str]
def get_time(self) -> datetime:
return dateutil.parser.isoparse(self.time)
def git_log(path: Path, fail_silently: bool = False) -> list[ExtractedCommit]:
"""
Call and parse git log. This function requires that git>=2.37.1 is installed on your system.
:param path: Path of git repository
:param fail_silently: If true, ignore errors. If false, raise exception when errors occur.
:return: List of commits
"""
# check_call(shlex.split('git config diff.renames 0'))
cmd = f"git -c 'diff.renamelimit=0' -c 'diff.renames=0' -C '{path.absolute()}' log --name-status --diff-filter=AMD --pretty=format:'START_COMMIT_QwQ %H%n%aN%n%aE%n%aI%n%s%n'"
log = check_output(shlex.split(cmd)).decode('utf-8', 'ignore')
def extract_commit(block: str) -> ExtractedCommit:
try:
lines = block.split('\n')
sha, author, email, date, message = lines + [""] if len(lines) == 4 else lines[:5]
files = [f.replace('\t', '/') for f in lines[6:]]
return ExtractedCommit(sha, author, email, date, message, files)
except Exception as e:
print(f'========== Commit Extract Error {e} ==========\n{block}\n==========')
if not fail_silently:
raise e
return [extract_commit(c.strip()) for c in log.split('START_COMMIT_QwQ') if c]
+24
View File
@@ -0,0 +1,24 @@
import logging
import os
def setup_logger(debug: bool = os.environ.get("DEBUG", False)):
# Try to use rich for pretty printing
try:
from rich.logging import RichHandler
handler = RichHandler(rich_tracebacks=True)
from rich.traceback import install
install(show_locals=True)
except ImportError:
handler = logging.StreamHandler()
# Initialize debug logger
logging.basicConfig(
level="NOTSET" if debug else "INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[handler]
)
return logging.getLogger("a2")
+46
View File
@@ -0,0 +1,46 @@
"""
Natual language processing utils
"""
from __future__ import annotations
def camel_split(camel: str) -> list[str]:
"""
Split camel case string into sentence
Credit: https://stackoverflow.com/a/58996565/7346633
:param camel: E.g. HelloWorld or helloWorld
:return: E.g. ['Hello', 'World']
"""
# Ignore all caps or all lower
if camel.isupper() or camel.islower() or camel.isnumeric():
return [camel]
idx = list(map(str.isupper, camel))
# Mark change of case
word = [0]
for (i, (x, y)) in enumerate(zip(idx, idx[1:])):
if x and not y: # "Ul"
word.append(i)
elif not x and y: # "lU"
word.append(i + 1)
word.append(len(camel))
# for "lUl", index of "U" will pop twice, have to filter that
return [camel[x:y] for x, y in zip(word, word[1:]) if x < y]
def substr_between(s: str, start: str | None = None, end: str | None = None):
"""
Get substring between two strings
>>> substr_between('abc { meow } def', '{', '}')
' meow '
"""
if start:
s = s[s.index(start) + len(start):]
if end:
s = s[:s.index(end)]
return s
+27
View File
@@ -0,0 +1,27 @@
import requests
def setup_proxy(session: requests.Session, addr: str = 'socks5://localhost:9050', verbose: bool = True):
url = 'https://ip.me'
# Setup proxy
ip = session.get(url).text.strip()
session.proxies = {
'http': addr,
'https': addr
}
proxy_ip = session.get(url).text.strip()
# Print ip
if verbose:
print(f'Raw ip: {ip}')
print(f'Proxy ip: {proxy_ip}')
# ips shouldn't match
assert ip != proxy_ip, 'Proxy did not start correctly.'
# Disable default requests behavior
def warn(*args, **kwargs):
raise ReferenceError('Use session.get instead of requests.get')
requests.get = warn
requests.post = warn
+79
View File
@@ -0,0 +1,79 @@
"""
Importing this file requires numpy, matplotlib, and numba
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from matplotlib import pyplot as plt
from numba import njit
@dataclass
class Statistics:
mean: float
median: float
lower_quartile: float
upper_quartile: float
iqr: float
minimum: float
maximum: float
count: int
total: float
stddev: float
def get_metric_6(self) -> tuple[float, float, float, float, float, float]:
return self.mean, self.median, self.minimum, self.maximum, self.lower_quartile, self.upper_quartile
def print(self, dec: int = 2):
print(f'> Mean: {round(self.mean, dec)}, Median: {round(self.median, dec)}')
print(f'> Min: {round(self.minimum, dec)}, Max: {round(self.maximum, dec)}')
print(f'> Q1: {round(self.lower_quartile, dec)}, Q3: {round(self.upper_quartile, dec)}')
print(f'> StdDev: {round(self.stddev, dec)}, IQR: {round(self.iqr, dec)}')
print(f'> N: {self.count}')
@njit(cache=True)
def _calc_col_stats_helper(col: np.ndarray) -> tuple[float, float, float, float, float, float, float, int, float, float]:
q1 = np.quantile(col, 0.25)
q3 = np.quantile(col, 0.75)
return (
float(np.mean(col)),
float(np.median(col)),
float(q1),
float(q3),
float(q3 - q1),
float(np.min(col)),
float(np.max(col)),
len(col),
float(np.sum(col)),
float(np.std(col))
)
def calc_col_stats(col: np.ndarray | list) -> Statistics:
"""
Compute statistics for a data column
:param col: Input column (tested on 1D array)
:return: Statistics
"""
if isinstance(col, list):
col = np.array(col)
return Statistics(*_calc_col_stats_helper(col))
def plot(**kwargs) -> plt:
"""
Pyplot configurator shorthand
Example: plt_cfg(xlabel="X", ylabel="Y") is equivalent to plt.xlabel("X"); plt.ylabel("Y")
"""
for k, args in kwargs.items():
if isinstance(args, dict):
getattr(plt, k)(**args)
else:
getattr(plt, k)(args)
return plt
+206 -3
View File
@@ -1,12 +1,28 @@
from __future__ import annotations
import base64
import dataclasses
import datetime
import hashlib
import inspect
import io
import json
import pickle
from enum import Enum
from pathlib import Path
from types import SimpleNamespace
from typing import Any
def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
def pickle_encode(obj: Any, protocol=None, fix_imports=True) -> bytes:
"""
Encode object to pickle bytes
>>> by = pickle_encode({'meow': 565656})
>>> by = pickle_encode({'function': pickle_encode})
>>> len(by)
57
>>> decoded = pickle_decode(by)
>>> by = decoded['function']({'meow': 565656})
>>> pickle_decode(by)
{'meow': 565656}
"""
@@ -15,9 +31,196 @@ def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
return bio.getvalue()
def pickle_decode(by: bytes) -> any:
def pickle_decode(by: bytes) -> Any:
"""
Decode pickle bytes to object
"""
with io.BytesIO(by) as bio:
return pickle.load(bio)
def infer(o: object) -> object | None:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
# Support encoding datetime
if isinstance(o, (datetime.datetime, datetime.date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
# Support for Path
if isinstance(o, Path):
return str(o)
# Support for byte arrays (encode as base64 string)
if isinstance(o, bytes):
return base64.b64encode(o).decode()
# Enums
if isinstance(o, Enum):
return o.name
return None
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
return infer(o) or super().default(o)
class ForceJSONEcoder(EnhancedJSONEncoder):
"""
A json encoder that can serialize almost everything (including custom classes, byte arrays)
"""
def default(self, o: object) -> object:
infer_result = infer(o)
if infer_result:
return infer_result
# # Support EnumType
# if isinstance(o, EnumType):
# return {i.name: i.value for i in o}
# Support for custom classes (get dict values)
if hasattr(o, '__dict__') and not inspect.isclass(o):
return dict(vars(o))
return super().default(o)
def json_stringify(obj: object, forced: bool = True, **kwargs) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param forced: Whether to force the conversion of classes and byte arrays
:return: Json strings
"""
args = dict(ensure_ascii=False, cls=ForceJSONEcoder if forced else EnhancedJSONEncoder)
args.update(kwargs)
return json.dumps(obj, **args)
class SafeNamespace(SimpleNamespace):
def __getattr__(self, attr):
try:
return super().__getattr__(attr)
except AttributeError:
return None
def jsn(s: str) -> SafeNamespace:
return json.loads(s, object_hook=lambda d: SafeNamespace(**d))
def ensure_dir(path: Path | str) -> Path:
"""
Ensure that the directory exists (and create if not)
:returns The directory
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def ensure_parent(path: Path | str) -> Path:
"""
Ensure that the parent directory of a path exists (and create if not)
:return: The directory
"""
path = Path(path)
ensure_dir(path.parent)
return path
def write(fp: Path | str, data: bytes | str):
"""
Make sure the directory exists, and then write data, either in bytes or string.
Also forces utf-8 encoding for strings.
"""
fp = ensure_parent(fp)
if isinstance(data, str):
return fp.write_text(data, 'utf-8')
if isinstance(data, bytes):
return fp.write_bytes(data)
def read(file: Path | str) -> str:
"""
Read file content, force utf-8
:param file: File path
:return: File content
"""
return Path(file).read_text('utf-8')
def write_json(fp: Path | str, data: Any, **kwargs):
write(fp, json_stringify(data, **kwargs))
def parse_date_time(iso: str) -> datetime.datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime.datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
def md5(file: Path | str) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
+40
View File
@@ -0,0 +1,40 @@
"""
Importing this file requires installing tqdm.
"""
from __future__ import annotations
import os
from functools import partial
from typing import Callable, Iterable
import tqdm
from tqdm.contrib.concurrent import process_map, thread_map
def smap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
return [fn(i) for i in tqdm.tqdm(lst, position=0, leave=True, *args, **kwargs)]
def pmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tqdm_args.update(kwargs)
return process_map(fn, lst, *args, **tqdm_args)
def tmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tqdm_args.update(kwargs)
return thread_map(fn, lst, *args, **tqdm_args)
def tq(it: Iterable, desc: str, *args, **kwargs) -> tqdm:
tqdm_args = dict(position=0, leave=True)
return tqdm.tqdm(it, desc, *args, **{**tqdm_args, **kwargs})
def patch_tqdm():
tqdm_args = dict(chunksize=1, position=0, leave=True, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tq: Callable[[Iterable], tqdm.tqdm] = partial(tqdm.tqdm, position=0, leave=True)
pmap = partial(process_map, **tqdm_args)
tmap = partial(thread_map, **tqdm_args)
return tq, pmap, tmap
+51
View File
@@ -0,0 +1,51 @@
import pickle
from pathlib import Path
import zstandard as zstd
import orjson
from . import write
zstd_d = zstd.ZstdDecompressor()
zstd_c = zstd.ZstdCompressor(level=5, write_checksum=True, threads=-1)
def load_json_zst(file_path: str | Path) -> dict | list:
"""
Load a .json.zst file and return its parsed content.
Parameters:
file_path (str): The path to the .json.zst file.
Returns:
dict or list: The parsed JSON content.
"""
with Path(file_path).open('rb') as f:
return orjson.loads(zstd_d.stream_reader(f).read())
def write_json_zst(file_path: str | Path, data: dict | list, **kwargs):
"""
Dump data to a .json.zst file.
Parameters:
file_path (str): The path to the .json.zst file.
data (dict or list): The data to dump.
"""
write(file_path, zstd_c.compress(orjson.dumps(data, **kwargs)))
def load_pickle_zst(file_path: str | Path):
with Path(file_path).open('rb') as f:
return pickle.loads(zstd_d.stream_reader(f).read())
def write_pickle_zst(file_path: str | Path, data):
write(file_path, zstd_c.compress(pickle.dumps(data)))
if __name__ == '__main__':
write_pickle_zst('test.pickle.zst', {'a': 1, 'b': 2})
assert load_pickle_zst('test.pickle.zst') == {'a': 1, 'b': 2}
write_json_zst('test.json.zst', {'a': 1, 'b': 2})
assert load_json_zst('test.json.zst') == {'a': 1, 'b': 2}
+4 -9
View File
@@ -22,17 +22,12 @@ setup(
classifiers=[
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
],
packages=find_packages(exclude=("tests",)),
include_package_data=True,
install_requires=[],
entry_points={
"console_scripts": [
"hypy_utils=hypy_utils.__main__:main",
]
},
install_requires=[]
)