Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| df16f90a8f | |||
| 25aecabd34 | |||
| 332a63479e | |||
| b748a217a0 | |||
| afaef06f40 | |||
| 1948ff4a9c | |||
| 80bf1da83d | |||
| 6a60712d8c | |||
| 0530d41f42 | |||
| f6aa847368 | |||
| cb6aff290d | |||
| 1325224fd8 | |||
| 04f987cab8 | |||
| 156562f5a3 | |||
| 26d756e628 | |||
| e0b2ef63b7 | |||
| 374aedabb6 | |||
| c28ca20edc | |||
| 2480f4e690 | |||
| de3a30ef34 | |||
| 7d419b375b |
@@ -116,3 +116,5 @@ dmypy.json
|
||||
# Custom
|
||||
.idea
|
||||
HyPyUtils.iml
|
||||
.DS_Store
|
||||
._*
|
||||
|
||||
@@ -1,2 +1,10 @@
|
||||
# HyPyUtils
|
||||
HyDEV Utils for Python
|
||||
|
||||
## Modules
|
||||
|
||||
| Module | Requirements |
|
||||
|--------------------|--------------------------|
|
||||
| `tqdm_utils` | tqdm |
|
||||
| `downloader` | tqdm, requests |
|
||||
| `scientific_utils` | numpy, numba, matplotlib |
|
||||
|
||||
+4
-189
@@ -1,197 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
__version__ = "1.0.11"
|
||||
__version__ = "1.0.19"
|
||||
|
||||
import dataclasses
|
||||
import hashlib
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
from typing import Union, Callable
|
||||
from types import SimpleNamespace
|
||||
from typing import Callable
|
||||
|
||||
|
||||
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
|
||||
"""
|
||||
Convert rgb color into ANSI escape code format
|
||||
|
||||
:param r:
|
||||
:param g:
|
||||
:param b:
|
||||
:param foreground: Whether the color applies to forground
|
||||
:return: Escape code
|
||||
"""
|
||||
c = '38' if foreground else '48'
|
||||
return f'\033[{c};2;{r};{g};{b}m'
|
||||
|
||||
|
||||
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
|
||||
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
|
||||
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
|
||||
"&f/\033[1;37m",
|
||||
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
|
||||
replacements = [(r[:2], r[3:]) for r in replacements]
|
||||
|
||||
|
||||
def color(msg: str) -> str:
|
||||
"""
|
||||
Replace extended minecraft color codes in string
|
||||
|
||||
:param msg: Message with minecraft color codes
|
||||
:return: Message with escape codes
|
||||
"""
|
||||
for code, esc in replacements:
|
||||
msg = msg.replace(code, esc)
|
||||
|
||||
while '&gf(' in msg or '&gb(' in msg:
|
||||
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
|
||||
end = msg.index(')', i)
|
||||
code = msg[i + 4:end]
|
||||
fore = msg[i + 2] == 'f'
|
||||
|
||||
if code.startswith('#'):
|
||||
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
|
||||
else:
|
||||
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
|
||||
rgb = tuple(int(c) for c in code.split(' '))
|
||||
|
||||
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def printc(msg: str):
|
||||
"""
|
||||
Print with color
|
||||
|
||||
:param msg: Message with minecraft color codes
|
||||
"""
|
||||
print(color(msg + '&r'))
|
||||
|
||||
|
||||
def parse_date_time(iso: str) -> datetime:
|
||||
"""
|
||||
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
|
||||
python's built-in dateutil.parser.isoparse() function.
|
||||
|
||||
Preconditions:
|
||||
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
|
||||
- iso is a valid date (this function does not check for the validity of the input)
|
||||
|
||||
:param iso: Input date
|
||||
:return: Datetime object
|
||||
"""
|
||||
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
|
||||
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
|
||||
|
||||
|
||||
def parse_date_only(iso: str) -> datetime:
|
||||
"""
|
||||
Parse date faster.
|
||||
|
||||
Preconditions:
|
||||
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
|
||||
- iso is a valid date (this function does not check for the validity of the input)
|
||||
|
||||
:param iso: Input date
|
||||
:return: Datetime object
|
||||
"""
|
||||
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
|
||||
|
||||
|
||||
class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
"""
|
||||
An improvement to the json.JSONEncoder class, which supports:
|
||||
encoding for dataclasses, encoding for datetime, and sets
|
||||
"""
|
||||
|
||||
def default(self, o: object) -> object:
|
||||
|
||||
# Support encoding dataclasses
|
||||
# https://stackoverflow.com/a/51286749/7346633
|
||||
if dataclasses.is_dataclass(o):
|
||||
return dataclasses.asdict(o)
|
||||
|
||||
# Simple namespace
|
||||
if isinstance(o, SimpleNamespace):
|
||||
return o.__dict__
|
||||
|
||||
# Support encoding datetime
|
||||
if isinstance(o, (datetime, date)):
|
||||
return o.isoformat()
|
||||
|
||||
# Support for sets
|
||||
# https://stackoverflow.com/a/8230505/7346633
|
||||
if isinstance(o, set):
|
||||
return list(o)
|
||||
|
||||
return super().default(o)
|
||||
|
||||
|
||||
def json_stringify(obj: object, indent: Union[int, None] = None) -> str:
|
||||
"""
|
||||
Serialize json string with support for dataclasses and datetime and sets and with custom
|
||||
configuration.
|
||||
|
||||
Preconditions:
|
||||
- obj != None
|
||||
|
||||
:param obj: Objects
|
||||
:param indent: Indent size or none
|
||||
:return: Json strings
|
||||
"""
|
||||
return json.dumps(obj, indent=indent, cls=EnhancedJSONEncoder, ensure_ascii=False)
|
||||
|
||||
|
||||
def jsn(s: str):
|
||||
return json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
|
||||
|
||||
|
||||
def write(file: Union[str, Path], text: str) -> None:
|
||||
"""
|
||||
Write text to a file
|
||||
|
||||
Preconditions:
|
||||
- file != ''
|
||||
|
||||
:param file: File path (will be converted to lowercase)
|
||||
:param text: Text
|
||||
:return: None
|
||||
"""
|
||||
file = Path(file)
|
||||
file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with file.open('w', encoding='utf-8') as f:
|
||||
f.write(text)
|
||||
|
||||
|
||||
def read(file: Union[str, Path]) -> str:
|
||||
"""
|
||||
Read file content
|
||||
|
||||
Preconditions:
|
||||
- file != ''
|
||||
|
||||
:param file: File path (will be converted to lowercase)
|
||||
:return: None
|
||||
"""
|
||||
return file.read_text('utf-8')
|
||||
|
||||
|
||||
def md5(file: Union[str, Path]) -> str:
|
||||
"""
|
||||
Compute md5 of a file
|
||||
|
||||
:param file: File path
|
||||
:return: md5 string
|
||||
"""
|
||||
file = Path(file)
|
||||
hash_md5 = hashlib.md5()
|
||||
with open(file, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
from .color_utils import *
|
||||
from .serializer import *
|
||||
|
||||
|
||||
class Timer:
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
|
||||
|
||||
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
|
||||
"""
|
||||
Convert rgb color into ANSI escape code format
|
||||
|
||||
:param r:
|
||||
:param g:
|
||||
:param b:
|
||||
:param foreground: Whether the color applies to forground
|
||||
:return: Escape code
|
||||
"""
|
||||
c = '38' if foreground else '48'
|
||||
return f'\033[{c};2;{r};{g};{b}m'
|
||||
|
||||
|
||||
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
|
||||
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
|
||||
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
|
||||
"&f/\033[1;37m",
|
||||
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
|
||||
replacements = [(r[:2], r[3:]) for r in replacements]
|
||||
|
||||
|
||||
def color(msg: str) -> str:
|
||||
"""
|
||||
Replace extended minecraft color codes in string
|
||||
|
||||
:param msg: Message with minecraft color codes
|
||||
:return: Message with escape codes
|
||||
"""
|
||||
for code, esc in replacements:
|
||||
msg = msg.replace(code, esc)
|
||||
|
||||
while '&gf(' in msg or '&gb(' in msg:
|
||||
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
|
||||
end = msg.index(')', i)
|
||||
code = msg[i + 4:end]
|
||||
fore = msg[i + 2] == 'f'
|
||||
|
||||
if code.startswith('#'):
|
||||
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
|
||||
else:
|
||||
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
|
||||
rgb = tuple(int(c) for c in code.split(' '))
|
||||
|
||||
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def printc(msg: str):
|
||||
"""
|
||||
Print with color
|
||||
|
||||
:param msg: Message with minecraft color codes
|
||||
"""
|
||||
print(color(msg + '&r'))
|
||||
@@ -0,0 +1,86 @@
|
||||
from hypy_utils import infer
|
||||
|
||||
|
||||
def is_non_empty(o):
|
||||
return not hasattr(o, '__len__') or len(o) > 0
|
||||
|
||||
|
||||
def remove_values(d: dict | list, vals: list, preserve_list: bool = False) -> dict | list:
|
||||
"""
|
||||
Recursively remove values from a dict
|
||||
|
||||
:param d: Dict
|
||||
:param vals: Values to remove
|
||||
:param preserve_list: Whether to ignore list elements
|
||||
:return: Dict without specific values
|
||||
"""
|
||||
if isinstance(d, list):
|
||||
d = [remove_values(i, vals, preserve_list) for i in d if preserve_list or i not in vals]
|
||||
d = [i for i in d if is_non_empty(i)]
|
||||
return d
|
||||
|
||||
if isinstance(d, dict):
|
||||
d = {k: remove_values(v, vals, preserve_list) for k, v in d.items() if v not in vals}
|
||||
d = {k: v for k, v in d.items() if is_non_empty(v)}
|
||||
return d
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def remove_nones(d: dict | list, preserve_list: bool = False) -> dict:
|
||||
"""
|
||||
Recursively remove nones from a dict
|
||||
|
||||
>>> remove_nones({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]})
|
||||
{'a': {'c': 1}, 'd': [1]}
|
||||
|
||||
:param d: Dict
|
||||
:param preserve_list: Whether to ignore list elements
|
||||
:return: Dict without nones
|
||||
"""
|
||||
return remove_values(d, [None], preserve_list=preserve_list)
|
||||
|
||||
|
||||
def remove_keys(d: dict | list, keys: set) -> dict | list:
|
||||
"""
|
||||
Recursively remove keys
|
||||
|
||||
>>> remove_keys({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]}, {'b'})
|
||||
{'a': {'c': 1}, 'c': {'a': None}, 'd': [None, 1]}
|
||||
|
||||
:param d: The dictionary that you want to remove keys from
|
||||
:param keys: Set of keys you want to remove
|
||||
:return: Dict without specific keys
|
||||
"""
|
||||
if isinstance(d, list):
|
||||
d = [remove_keys(i, keys) for i in d]
|
||||
d = [i for i in d if is_non_empty(i)]
|
||||
return d
|
||||
|
||||
if isinstance(d, dict):
|
||||
d = {k: remove_keys(v, keys) for k, v in d.items() if k not in keys}
|
||||
d = {k: v for k, v in d.items() if is_non_empty(v)}
|
||||
return d
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def deep_dict(o: object, exclude: set | None):
|
||||
"""
|
||||
Recursively convert an object into a dictionary
|
||||
|
||||
:param o: Object
|
||||
:param exclude: Keys to exclude
|
||||
:return: Deep dictionary of the object's variables
|
||||
"""
|
||||
exclude = exclude or {}
|
||||
infer_result = infer(o)
|
||||
if infer_result:
|
||||
return infer_result
|
||||
if hasattr(o, '__dict__'):
|
||||
return deep_dict(dict(vars(o)), exclude)
|
||||
if isinstance(o, dict):
|
||||
return {k: deep_dict(v, exclude) for k, v in o.items() if k not in exclude}
|
||||
if isinstance(o, list):
|
||||
return [deep_dict(v, exclude) for v in o]
|
||||
return o
|
||||
@@ -0,0 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
import tqdm
|
||||
|
||||
|
||||
def download_file(url: str, file: str | Path, progress: bool = True):
|
||||
"""
|
||||
Helper method handling downloading large files from `url` to `filename`.
|
||||
Returns a pointer to `filename`.
|
||||
https://stackoverflow.com/a/42071418/7346633
|
||||
"""
|
||||
file = Path(file)
|
||||
if file.is_file():
|
||||
return file
|
||||
|
||||
chunk_size = 1024
|
||||
|
||||
try:
|
||||
term_len = os.get_terminal_size().columns
|
||||
bar_len = int(term_len * 0.4)
|
||||
except Exception:
|
||||
term_len = 60
|
||||
bar_len = 20
|
||||
|
||||
tqdm_args = dict()
|
||||
r = requests.get(url, stream=True)
|
||||
if 'content-length' in r.headers:
|
||||
tqdm_args['total'] = int(r.headers['content-length']) / 1024 / 1024
|
||||
|
||||
with open(file, 'wb') as f:
|
||||
pbar = None
|
||||
if progress:
|
||||
pbar = tqdm.tqdm(unit=" MB", ncols=term_len,
|
||||
bar_format='{desc} {rate_noinv_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #',
|
||||
desc=file.name[:bar_len].ljust(bar_len), **tqdm_args)
|
||||
|
||||
for chunk in r.iter_content(chunk_size=chunk_size):
|
||||
if chunk:
|
||||
if pbar:
|
||||
pbar.update(len(chunk) / 1024 / 1024)
|
||||
f.write(chunk)
|
||||
|
||||
return file
|
||||
@@ -0,0 +1,37 @@
|
||||
import base64
|
||||
|
||||
FILENAME_BLACKLIST = [
|
||||
# Unix and Windows
|
||||
"/",
|
||||
|
||||
# Windows only
|
||||
"<", ">", ":", '"', "\\", "|", "?", "*", "\0",
|
||||
"CON", "PRN", "AUX", "NUL",
|
||||
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
|
||||
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
|
||||
|
||||
# Just for extra safety
|
||||
"~"
|
||||
]
|
||||
|
||||
FILENAME_REPLACE = {c: f"%{base64.b64encode(c.encode()).decode().replace('=', '')}" for c in FILENAME_BLACKLIST}
|
||||
|
||||
|
||||
def escape_filename(fn: str) -> str:
|
||||
fn = fn.replace("%", "[ PeRcEnT EsCaPe owo ]")
|
||||
|
||||
for c, r in FILENAME_REPLACE.items():
|
||||
fn = fn.replace(c, r)
|
||||
|
||||
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%%")
|
||||
return fn
|
||||
|
||||
|
||||
def unescape_filename(fn: str) -> str:
|
||||
fn = fn.replace("%%", "[ PeRcEnT EsCaPe owo ]")
|
||||
|
||||
for c, r in FILENAME_REPLACE.items():
|
||||
fn = fn.replace(r, c)
|
||||
|
||||
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%")
|
||||
return fn
|
||||
@@ -0,0 +1,24 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
|
||||
def setup_logger(debug: bool = os.environ.get("DEBUG", False)):
|
||||
# Try to use rich for pretty printing
|
||||
try:
|
||||
from rich.logging import RichHandler
|
||||
handler = RichHandler(rich_tracebacks=True)
|
||||
|
||||
from rich.traceback import install
|
||||
install(show_locals=True)
|
||||
except ImportError:
|
||||
handler = logging.StreamHandler()
|
||||
|
||||
# Initialize debug logger
|
||||
logging.basicConfig(
|
||||
level="NOTSET" if debug else "INFO",
|
||||
format="%(message)s",
|
||||
datefmt="[%X]",
|
||||
handlers=[handler]
|
||||
)
|
||||
|
||||
return logging.getLogger("a2")
|
||||
@@ -0,0 +1,27 @@
|
||||
import requests
|
||||
|
||||
|
||||
def setup_proxy(session: requests.Session, addr: str = 'socks5://localhost:9050', verbose: bool = True):
|
||||
url = 'https://ifconfig.me/ip'
|
||||
|
||||
# Setup proxy
|
||||
ip = session.get(url).text.strip()
|
||||
session.proxies = {
|
||||
'http': addr,
|
||||
'https': addr
|
||||
}
|
||||
proxy_ip = session.get(url).text.strip()
|
||||
|
||||
# Print ip
|
||||
if verbose:
|
||||
print(f'Raw ip: {ip}')
|
||||
print(f'Proxy ip: {proxy_ip}')
|
||||
|
||||
# ips shouldn't match
|
||||
assert ip != proxy_ip, 'Proxy did not start correctly.'
|
||||
|
||||
# Disable default requests behavior
|
||||
def warn(*args, **kwargs):
|
||||
raise ReferenceError('Use session.get instead of requests.get')
|
||||
requests.get = warn
|
||||
requests.post = warn
|
||||
+199
-2
@@ -1,10 +1,20 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import dataclasses
|
||||
import datetime
|
||||
import hashlib
|
||||
import inspect
|
||||
import io
|
||||
import json
|
||||
import pickle
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
|
||||
def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
|
||||
def pickle_encode(obj: Any, protocol=None, fix_imports=True) -> bytes:
|
||||
"""
|
||||
Encode object to pickle bytes
|
||||
|
||||
@@ -21,9 +31,196 @@ def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
|
||||
return bio.getvalue()
|
||||
|
||||
|
||||
def pickle_decode(by: bytes) -> any:
|
||||
def pickle_decode(by: bytes) -> Any:
|
||||
"""
|
||||
Decode pickle bytes to object
|
||||
"""
|
||||
with io.BytesIO(by) as bio:
|
||||
return pickle.load(bio)
|
||||
|
||||
|
||||
def infer(o: object) -> object | None:
|
||||
# Support encoding dataclasses
|
||||
# https://stackoverflow.com/a/51286749/7346633
|
||||
if dataclasses.is_dataclass(o):
|
||||
return dataclasses.asdict(o)
|
||||
|
||||
# Simple namespace
|
||||
if isinstance(o, SimpleNamespace):
|
||||
return o.__dict__
|
||||
|
||||
# Support encoding datetime
|
||||
if isinstance(o, (datetime.datetime, datetime.date)):
|
||||
return o.isoformat()
|
||||
|
||||
# Support for sets
|
||||
# https://stackoverflow.com/a/8230505/7346633
|
||||
if isinstance(o, set):
|
||||
return list(o)
|
||||
|
||||
# Support for Path
|
||||
if isinstance(o, Path):
|
||||
return str(o)
|
||||
|
||||
# Support for byte arrays (encode as base64 string)
|
||||
if isinstance(o, bytes):
|
||||
return base64.b64encode(o).decode()
|
||||
|
||||
# Enums
|
||||
if isinstance(o, Enum):
|
||||
return o.name
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
"""
|
||||
An improvement to the json.JSONEncoder class, which supports:
|
||||
encoding for dataclasses, encoding for datetime, and sets
|
||||
"""
|
||||
def default(self, o: object) -> object:
|
||||
return infer(o) or super().default(o)
|
||||
|
||||
|
||||
class ForceJSONEcoder(EnhancedJSONEncoder):
|
||||
"""
|
||||
A json encoder that can serialize almost everything (including custom classes, byte arrays)
|
||||
"""
|
||||
def default(self, o: object) -> object:
|
||||
infer_result = infer(o)
|
||||
if infer_result:
|
||||
return infer_result
|
||||
|
||||
# # Support EnumType
|
||||
# if isinstance(o, EnumType):
|
||||
# return {i.name: i.value for i in o}
|
||||
|
||||
# Support for custom classes (get dict values)
|
||||
if hasattr(o, '__dict__') and not inspect.isclass(o):
|
||||
return dict(vars(o))
|
||||
|
||||
return super().default(o)
|
||||
|
||||
|
||||
def json_stringify(obj: object, forced: bool = True, **kwargs) -> str:
|
||||
"""
|
||||
Serialize json string with support for dataclasses and datetime and sets and with custom
|
||||
configuration.
|
||||
|
||||
Preconditions:
|
||||
- obj != None
|
||||
|
||||
:param obj: Objects
|
||||
:param forced: Whether to force the conversion of classes and byte arrays
|
||||
:return: Json strings
|
||||
"""
|
||||
args = dict(ensure_ascii=False, cls=ForceJSONEcoder if forced else EnhancedJSONEncoder)
|
||||
args.update(kwargs)
|
||||
return json.dumps(obj, **args)
|
||||
|
||||
|
||||
class SafeNamespace(SimpleNamespace):
|
||||
def __getattr__(self, attr):
|
||||
try:
|
||||
return super().__getattr__(attr)
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
|
||||
def jsn(s: str) -> SafeNamespace:
|
||||
return json.loads(s, object_hook=lambda d: SafeNamespace(**d))
|
||||
|
||||
|
||||
def ensure_dir(path: Path | str) -> Path:
|
||||
"""
|
||||
Ensure that the directory exists (and create if not)
|
||||
|
||||
:returns The directory
|
||||
"""
|
||||
path = Path(path)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def ensure_parent(path: Path | str) -> Path:
|
||||
"""
|
||||
Ensure that the parent directory of a path exists (and create if not)
|
||||
|
||||
:return: The directory
|
||||
"""
|
||||
path = Path(path)
|
||||
ensure_dir(path.parent)
|
||||
return path
|
||||
|
||||
|
||||
def write(fp: Path | str, data: bytes | str):
|
||||
"""
|
||||
Make sure the directory exists, and then write data, either in bytes or string.
|
||||
|
||||
Also forces utf-8 encoding for strings.
|
||||
"""
|
||||
fp = ensure_parent(fp)
|
||||
|
||||
if isinstance(data, str):
|
||||
return fp.write_text(data, 'utf-8')
|
||||
if isinstance(data, bytes):
|
||||
return fp.write_bytes(data)
|
||||
|
||||
|
||||
def read(file: Path | str) -> str:
|
||||
"""
|
||||
Read file content, force utf-8
|
||||
|
||||
:param file: File path
|
||||
:return: File content
|
||||
"""
|
||||
return Path(file).read_text('utf-8')
|
||||
|
||||
|
||||
def write_json(fp: Path | str, data: Any, **kwargs):
|
||||
write(fp, json_stringify(data, **kwargs))
|
||||
|
||||
|
||||
def parse_date_time(iso: str) -> datetime.datetime:
|
||||
"""
|
||||
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
|
||||
python's built-in dateutil.parser.isoparse() function.
|
||||
|
||||
Preconditions:
|
||||
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
|
||||
- iso is a valid date (this function does not check for the validity of the input)
|
||||
|
||||
:param iso: Input date
|
||||
:return: Datetime object
|
||||
"""
|
||||
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
|
||||
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
|
||||
|
||||
|
||||
def parse_date_only(iso: str) -> datetime.datetime:
|
||||
"""
|
||||
Parse date faster.
|
||||
|
||||
Preconditions:
|
||||
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
|
||||
- iso is a valid date (this function does not check for the validity of the input)
|
||||
|
||||
:param iso: Input date
|
||||
:return: Datetime object
|
||||
"""
|
||||
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
|
||||
|
||||
|
||||
def md5(file: Path | str) -> str:
|
||||
"""
|
||||
Compute md5 of a file
|
||||
|
||||
:param file: File path
|
||||
:return: md5 string
|
||||
"""
|
||||
file = Path(file)
|
||||
hash_md5 = hashlib.md5()
|
||||
with open(file, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
|
||||
Reference in New Issue
Block a user