19 Commits

Author SHA1 Message Date
Azalea Gui 332a63479e [U] Release 1.0.18 2023-03-09 02:42:36 -05:00
Azalea Gui b748a217a0 [+] Logging utils 2023-03-09 02:42:03 -05:00
Azalea Gui afaef06f40 [+] Add setup_proxy 2023-01-29 23:54:53 -05:00
Azalea Gui 1948ff4a9c [O] Allow disabling progress bar during download 2023-01-13 05:48:43 -05:00
Hykilpikonna 80bf1da83d [U] Release 1.0.17 2022-12-19 01:42:28 -05:00
Hykilpikonna 6a60712d8c [+] deep_dict 2022-12-19 01:40:36 -05:00
Hykilpikonna 0530d41f42 [+] dict recursive remove 2022-12-19 01:38:52 -05:00
Hykilpikonna f6aa847368 [+] File name escaping 2022-12-19 01:12:08 -05:00
Hykilpikonna cb6aff290d [O] json support more types 2022-12-19 01:10:31 -05:00
Hykilpikonna 1325224fd8 [O] Support serializing Path, custom class, and byte arrays 2022-12-18 22:08:54 -05:00
Hykilpikonna 04f987cab8 [F] Fix download rate inverting 2022-12-18 21:59:50 -05:00
Hykilpikonna 156562f5a3 [U] Release 1.0.16 2022-10-29 19:07:13 -04:00
Hykilpikonna 26d756e628 [F] Fix error during download if content-length is not present 2022-10-29 19:06:56 -04:00
Azalea e0b2ef63b7 [U] Release 1.0.15 2022-10-29 17:23:50 -04:00
Azalea 374aedabb6 [F] Fix downloader 2022-10-29 17:08:23 -04:00
Azalea c28ca20edc [U] Release 1.0.14 2022-10-29 17:02:40 -04:00
Azalea 2480f4e690 [+] Downloader 2022-10-29 17:02:20 -04:00
Azalea (on HyDEV-Daisy) de3a30ef34 [U] 1.0.13 2022-08-26 00:35:06 -04:00
Azalea (on HyDEV-Daisy) 7d419b375b [U] Modular structure 2022-08-26 00:31:44 -04:00
10 changed files with 484 additions and 191 deletions
+2
View File
@@ -116,3 +116,5 @@ dmypy.json
# Custom
.idea
HyPyUtils.iml
.DS_Store
._*
+8
View File
@@ -1,2 +1,10 @@
# HyPyUtils
HyDEV Utils for Python
## Modules
| Module | Requirements |
|--------------------|--------------------------|
| `tqdm_utils` | tqdm |
| `downloader` | tqdm, requests |
| `scientific_utils` | numpy, numba, matplotlib |
+4 -189
View File
@@ -1,197 +1,12 @@
from __future__ import annotations
__version__ = "1.0.11"
__version__ = "1.0.18"
import dataclasses
import hashlib
import json
import time
from datetime import datetime, date
from pathlib import Path
from typing import Union, Callable
from types import SimpleNamespace
from typing import Callable
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
def parse_date_time(iso: str) -> datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
# Support encoding datetime
if isinstance(o, (datetime, date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
return super().default(o)
def json_stringify(obj: object, indent: Union[int, None] = None) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param indent: Indent size or none
:return: Json strings
"""
return json.dumps(obj, indent=indent, cls=EnhancedJSONEncoder, ensure_ascii=False)
def jsn(s: str):
return json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
def write(file: Union[str, Path], text: str) -> None:
"""
Write text to a file
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:param text: Text
:return: None
"""
file = Path(file)
file.parent.mkdir(parents=True, exist_ok=True)
with file.open('w', encoding='utf-8') as f:
f.write(text)
def read(file: Union[str, Path]) -> str:
"""
Read file content
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:return: None
"""
return file.read_text('utf-8')
def md5(file: Union[str, Path]) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
from .color_utils import *
from .serializer import *
class Timer:
+58
View File
@@ -0,0 +1,58 @@
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
+86
View File
@@ -0,0 +1,86 @@
from hypy_utils import infer
def is_non_empty(o):
return not hasattr(o, '__len__') or len(o) > 0
def remove_values(d: dict | list, vals: list, preserve_list: bool = False) -> dict | list:
"""
Recursively remove values from a dict
:param d: Dict
:param vals: Values to remove
:param preserve_list: Whether to ignore list elements
:return: Dict without specific values
"""
if isinstance(d, list):
d = [remove_values(i, vals, preserve_list) for i in d if preserve_list or i not in vals]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_values(v, vals, preserve_list) for k, v in d.items() if v not in vals}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def remove_nones(d: dict | list, preserve_list: bool = False) -> dict:
"""
Recursively remove nones from a dict
>>> remove_nones({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]})
{'a': {'c': 1}, 'd': [1]}
:param d: Dict
:param preserve_list: Whether to ignore list elements
:return: Dict without nones
"""
return remove_values(d, [None], preserve_list=preserve_list)
def remove_keys(d: dict | list, keys: set) -> dict | list:
"""
Recursively remove keys
>>> remove_keys({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]}, {'b'})
{'a': {'c': 1}, 'c': {'a': None}, 'd': [None, 1]}
:param d: The dictionary that you want to remove keys from
:param keys: Set of keys you want to remove
:return: Dict without specific keys
"""
if isinstance(d, list):
d = [remove_keys(i, keys) for i in d]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_keys(v, keys) for k, v in d.items() if k not in keys}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def deep_dict(o: object, exclude: set | None):
"""
Recursively convert an object into a dictionary
:param o: Object
:param exclude: Keys to exclude
:return: Deep dictionary of the object's variables
"""
exclude = exclude or {}
infer_result = infer(o)
if infer_result:
return infer_result
if hasattr(o, '__dict__'):
return deep_dict(dict(vars(o)), exclude)
if isinstance(o, dict):
return {k: deep_dict(v, exclude) for k, v in o.items() if k not in exclude}
if isinstance(o, list):
return [deep_dict(v, exclude) for v in o]
return o
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
import os
from pathlib import Path
import requests
import tqdm
def download_file(url: str, file: str | Path, progress: bool = True):
"""
Helper method handling downloading large files from `url` to `filename`.
Returns a pointer to `filename`.
https://stackoverflow.com/a/42071418/7346633
"""
file = Path(file)
if file.is_file():
return file
chunk_size = 1024
try:
term_len = os.get_terminal_size().columns
bar_len = int(term_len * 0.4)
except Exception:
term_len = 60
bar_len = 20
tqdm_args = dict()
r = requests.get(url, stream=True)
if 'content-length' in r.headers:
tqdm_args['total'] = int(r.headers['content-length']) / 1024 / 1024
with open(file, 'wb') as f:
pbar = None
if progress:
pbar = tqdm.tqdm(unit=" MB", ncols=term_len,
bar_format='{desc} {rate_noinv_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #',
desc=file.name[:bar_len].ljust(bar_len), **tqdm_args)
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
if pbar:
pbar.update(len(chunk) / 1024 / 1024)
f.write(chunk)
return file
+37
View File
@@ -0,0 +1,37 @@
import base64
FILENAME_BLACKLIST = [
# Unix and Windows
"/",
# Windows only
"<", ">", ":", '"', "\\", "|", "?", "*", "\0",
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
# Just for extra safety
"~"
]
FILENAME_REPLACE = {c: f"%{base64.b64encode(c.encode()).decode().replace('=', '')}" for c in FILENAME_BLACKLIST}
def escape_filename(fn: str) -> str:
fn = fn.replace("%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(c, r)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%%")
return fn
def unescape_filename(fn: str) -> str:
fn = fn.replace("%%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(r, c)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%")
return fn
+24
View File
@@ -0,0 +1,24 @@
import logging
import os
def setup_logger(debug: bool = os.environ.get("DEBUG", False)):
# Try to use rich for pretty printing
try:
from rich.logging import RichHandler
handler = RichHandler(rich_tracebacks=True)
from rich.traceback import install
install(show_locals=True)
except ImportError:
handler = logging.StreamHandler()
# Initialize debug logger
logging.basicConfig(
level="NOTSET" if debug else "INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[handler]
)
return logging.getLogger("a2")
+27
View File
@@ -0,0 +1,27 @@
import requests
def setup_proxy(session: requests.Session, addr: str = 'socks5://localhost:9050', verbose: bool = True):
url = 'https://ifconfig.me/ip'
# Setup proxy
ip = session.get(url).text.strip()
session.proxies = {
'http': addr,
'https': addr
}
proxy_ip = session.get(url).text.strip()
# Print ip
if verbose:
print(f'Raw ip: {ip}')
print(f'Proxy ip: {proxy_ip}')
# ips shouldn't match
assert ip != proxy_ip, 'Proxy did not start correctly.'
# Disable default requests behavior
def warn(*args, **kwargs):
raise ReferenceError('Use session.get instead of requests.get')
requests.get = warn
requests.post = warn
+191 -2
View File
@@ -1,10 +1,20 @@
from __future__ import annotations
import base64
import dataclasses
import datetime
import hashlib
import inspect
import io
import json
import pickle
from enum import Enum
from pathlib import Path
from types import SimpleNamespace
from typing import Any
def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
def pickle_encode(obj: Any, protocol=None, fix_imports=True) -> bytes:
"""
Encode object to pickle bytes
@@ -21,9 +31,188 @@ def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
return bio.getvalue()
def pickle_decode(by: bytes) -> any:
def pickle_decode(by: bytes) -> Any:
"""
Decode pickle bytes to object
"""
with io.BytesIO(by) as bio:
return pickle.load(bio)
def infer(o: object) -> object | None:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
# Support encoding datetime
if isinstance(o, (datetime.datetime, datetime.date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
# Support for Path
if isinstance(o, Path):
return str(o)
# Support for byte arrays (encode as base64 string)
if isinstance(o, bytes):
return base64.b64encode(o).decode()
# Enums
if isinstance(o, Enum):
return o.name
return None
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
return infer(o) or super().default(o)
class ForceJSONEcoder(EnhancedJSONEncoder):
"""
A json encoder that can serialize almost everything (including custom classes, byte arrays)
"""
def default(self, o: object) -> object:
infer_result = infer(o)
if infer_result:
return infer_result
# # Support EnumType
# if isinstance(o, EnumType):
# return {i.name: i.value for i in o}
# Support for custom classes (get dict values)
if hasattr(o, '__dict__') and not inspect.isclass(o):
return dict(vars(o))
return super().default(o)
def json_stringify(obj: object, forced: bool = True, **kwargs) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param forced: Whether to force the conversion of classes and byte arrays
:return: Json strings
"""
args = dict(ensure_ascii=False, cls=ForceJSONEcoder if forced else EnhancedJSONEncoder)
args.update(kwargs)
return json.dumps(obj, **args)
def jsn(s: str) -> SimpleNamespace:
return json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
def ensure_dir(path: Path | str) -> Path:
"""
Ensure that the directory exists (and create if not)
:returns The directory
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def ensure_parent(path: Path | str) -> Path:
"""
Ensure that the parent directory of a path exists (and create if not)
:return: The directory
"""
path = Path(path)
ensure_dir(path.parent)
return path
def write(fp: Path | str, data: bytes | str):
"""
Make sure the directory exists, and then write data, either in bytes or string.
Also forces utf-8 encoding for strings.
"""
fp = ensure_parent(fp)
if isinstance(data, str):
return fp.write_text(data, 'utf-8')
if isinstance(data, bytes):
return fp.write_bytes(data)
def read(file: Path | str) -> str:
"""
Read file content, force utf-8
:param file: File path
:return: File content
"""
return Path(file).read_text('utf-8')
def write_json(fp: Path | str, data: Any, **kwargs):
write(fp, json_stringify(data, **kwargs))
def parse_date_time(iso: str) -> datetime.datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime.datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
def md5(file: Path | str) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()