36 Commits

Author SHA1 Message Date
Azalea Gui 46ea72641f [F] Fix import 2023-11-06 03:52:52 -05:00
Azalea Gui 62929dd48a [F] Fix typo 2023-11-06 03:32:31 -05:00
Azalea Gui 25e319d898 [U] Update readme 2023-11-06 03:00:33 -05:00
Azalea Gui 6291d178d4 [+] Git utils 2023-11-06 03:00:05 -05:00
Azalea 87a46fcf28 [F] ipconfig.me is blocking proxy blacklist 2023-10-05 21:31:47 -04:00
azalea df16f90a8f [U] Release 1.0.19 2023-07-28 20:52:27 -07:00
azalea 25aecabd34 [+] SafeNamespace 2023-07-28 20:49:51 -07:00
Azalea Gui 332a63479e [U] Release 1.0.18 2023-03-09 02:42:36 -05:00
Azalea Gui b748a217a0 [+] Logging utils 2023-03-09 02:42:03 -05:00
Azalea Gui afaef06f40 [+] Add setup_proxy 2023-01-29 23:54:53 -05:00
Azalea Gui 1948ff4a9c [O] Allow disabling progress bar during download 2023-01-13 05:48:43 -05:00
Hykilpikonna 80bf1da83d [U] Release 1.0.17 2022-12-19 01:42:28 -05:00
Hykilpikonna 6a60712d8c [+] deep_dict 2022-12-19 01:40:36 -05:00
Hykilpikonna 0530d41f42 [+] dict recursive remove 2022-12-19 01:38:52 -05:00
Hykilpikonna f6aa847368 [+] File name escaping 2022-12-19 01:12:08 -05:00
Hykilpikonna cb6aff290d [O] json support more types 2022-12-19 01:10:31 -05:00
Hykilpikonna 1325224fd8 [O] Support serializing Path, custom class, and byte arrays 2022-12-18 22:08:54 -05:00
Hykilpikonna 04f987cab8 [F] Fix download rate inverting 2022-12-18 21:59:50 -05:00
Hykilpikonna 156562f5a3 [U] Release 1.0.16 2022-10-29 19:07:13 -04:00
Hykilpikonna 26d756e628 [F] Fix error during download if content-length is not present 2022-10-29 19:06:56 -04:00
Azalea e0b2ef63b7 [U] Release 1.0.15 2022-10-29 17:23:50 -04:00
Azalea 374aedabb6 [F] Fix downloader 2022-10-29 17:08:23 -04:00
Azalea c28ca20edc [U] Release 1.0.14 2022-10-29 17:02:40 -04:00
Azalea 2480f4e690 [+] Downloader 2022-10-29 17:02:20 -04:00
Azalea (on HyDEV-Daisy) de3a30ef34 [U] 1.0.13 2022-08-26 00:35:06 -04:00
Azalea (on HyDEV-Daisy) 7d419b375b [U] Modular structure 2022-08-26 00:31:44 -04:00
Azalea (on HyDEV-Daisy) 376ecaa26e [U] Release 1.0.11 2022-08-25 23:21:31 -04:00
Azalea (on HyDEV-Daisy) f990731261 [+] Add stddev to scientific utils 2022-08-25 23:16:48 -04:00
Azalea (on HyDEV-Daisy) fb57ec06ae [F] Fix bugs in tqdm utils 2022-08-25 23:12:50 -04:00
Hykilpikonna b76a624b4f [+] Add simple namespace support 2022-08-15 20:05:38 -04:00
Hykilpikonna 7f56d94fe8 [F] Fix tqdm utils 2022-08-13 18:30:45 -04:00
Hykilpikonna 8d68f22eaa [O] Use doctests 2022-08-13 17:51:19 -04:00
Hykilpikonna e29d0f2c00 [+] substr_between function 2022-08-13 17:50:02 -04:00
Hykilpikonna 3b72889785 [+] Add nlp utils 2022-07-25 13:16:46 -04:00
Azalea (on HyDEV-Daisy) 6911cad00e [F] Fix imports 2022-07-03 02:24:31 -04:00
Azalea (on HyDEV-Daisy) 2ac83c5808 [+] tqdm and scientific utils 2022-07-03 02:17:34 -04:00
15 changed files with 724 additions and 183 deletions
+2
View File
@@ -116,3 +116,5 @@ dmypy.json
# Custom
.idea
HyPyUtils.iml
.DS_Store
._*
+9
View File
@@ -1,2 +1,11 @@
# HyPyUtils
HyDEV Utils for Python
## Modules
| Module | Requirements |
|--------------------|--------------------------|
| `tqdm_utils` | tqdm |
| `downloader` | tqdm, requests |
| `scientific_utils` | numpy, numba, matplotlib |
| `git_utils` | dateutil |
+17 -180
View File
@@ -1,188 +1,12 @@
from __future__ import annotations
__version__ = "1.0.7"
__version__ = "1.0.22"
import dataclasses
import hashlib
import json
import time
from datetime import datetime, date
from pathlib import Path
from typing import Union
from typing import Callable
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
def parse_date_time(iso: str) -> datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Support encoding datetime
if isinstance(o, (datetime, date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
return super().default(o)
def json_stringify(obj: object, indent: Union[int, None] = None) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param indent: Indent size or none
:return: Json strings
"""
return json.dumps(obj, indent=indent, cls=EnhancedJSONEncoder, ensure_ascii=False)
def write(file: Union[str, Path], text: str) -> None:
"""
Write text to a file
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:param text: Text
:return: None
"""
file = Path(file)
file.parent.mkdir(parents=True, exist_ok=True)
with file.open('w', encoding='utf-8') as f:
f.write(text)
def read(file: Union[str, Path]) -> str:
"""
Read file content
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:return: None
"""
return file.read_text('utf-8')
def md5(file: Union[str, Path]) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
from .color_utils import *
from .serializer import *
class Timer:
@@ -202,3 +26,16 @@ class Timer:
def reset(self):
self.start = time.time_ns()
def mem(var: str):
print(f'Memory usage for {var}: {eval(f"sys.getsizeof({var})") / 1024:.1f}KB')
def run_time(func: Callable, *args, **kwargs):
name = getattr(func, '__name__', 'function')
start = time.time_ns()
iter = kwargs.pop('iter', 10)
_ = [func(*args, **kwargs) for _ in range(iter)]
ms = (time.time_ns() - start) / 1e6
print(f'RT {name:30} {ms:6.1f} ms')
+58
View File
@@ -0,0 +1,58 @@
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
+86
View File
@@ -0,0 +1,86 @@
from hypy_utils import infer
def is_non_empty(o):
return not hasattr(o, '__len__') or len(o) > 0
def remove_values(d: dict | list, vals: list, preserve_list: bool = False) -> dict | list:
"""
Recursively remove values from a dict
:param d: Dict
:param vals: Values to remove
:param preserve_list: Whether to ignore list elements
:return: Dict without specific values
"""
if isinstance(d, list):
d = [remove_values(i, vals, preserve_list) for i in d if preserve_list or i not in vals]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_values(v, vals, preserve_list) for k, v in d.items() if v not in vals}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def remove_nones(d: dict | list, preserve_list: bool = False) -> dict:
"""
Recursively remove nones from a dict
>>> remove_nones({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]})
{'a': {'c': 1}, 'd': [1]}
:param d: Dict
:param preserve_list: Whether to ignore list elements
:return: Dict without nones
"""
return remove_values(d, [None], preserve_list=preserve_list)
def remove_keys(d: dict | list, keys: set) -> dict | list:
"""
Recursively remove keys
>>> remove_keys({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]}, {'b'})
{'a': {'c': 1}, 'c': {'a': None}, 'd': [None, 1]}
:param d: The dictionary that you want to remove keys from
:param keys: Set of keys you want to remove
:return: Dict without specific keys
"""
if isinstance(d, list):
d = [remove_keys(i, keys) for i in d]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_keys(v, keys) for k, v in d.items() if k not in keys}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def deep_dict(o: object, exclude: set | None):
"""
Recursively convert an object into a dictionary
:param o: Object
:param exclude: Keys to exclude
:return: Deep dictionary of the object's variables
"""
exclude = exclude or {}
infer_result = infer(o)
if infer_result:
return infer_result
if hasattr(o, '__dict__'):
return deep_dict(dict(vars(o)), exclude)
if isinstance(o, dict):
return {k: deep_dict(v, exclude) for k, v in o.items() if k not in exclude}
if isinstance(o, list):
return [deep_dict(v, exclude) for v in o]
return o
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
import os
from pathlib import Path
import requests
import tqdm
def download_file(url: str, file: str | Path, progress: bool = True):
"""
Helper method handling downloading large files from `url` to `filename`.
Returns a pointer to `filename`.
https://stackoverflow.com/a/42071418/7346633
"""
file = Path(file)
if file.is_file():
return file
chunk_size = 1024
try:
term_len = os.get_terminal_size().columns
bar_len = int(term_len * 0.4)
except Exception:
term_len = 60
bar_len = 20
tqdm_args = dict()
r = requests.get(url, stream=True)
if 'content-length' in r.headers:
tqdm_args['total'] = int(r.headers['content-length']) / 1024 / 1024
with open(file, 'wb') as f:
pbar = None
if progress:
pbar = tqdm.tqdm(unit=" MB", ncols=term_len,
bar_format='{desc} {rate_noinv_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #',
desc=file.name[:bar_len].ljust(bar_len), **tqdm_args)
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
if pbar:
pbar.update(len(chunk) / 1024 / 1024)
f.write(chunk)
return file
+37
View File
@@ -0,0 +1,37 @@
import base64
FILENAME_BLACKLIST = [
# Unix and Windows
"/",
# Windows only
"<", ">", ":", '"', "\\", "|", "?", "*", "\0",
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
# Just for extra safety
"~"
]
FILENAME_REPLACE = {c: f"%{base64.b64encode(c.encode()).decode().replace('=', '')}" for c in FILENAME_BLACKLIST}
def escape_filename(fn: str) -> str:
fn = fn.replace("%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(c, r)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%%")
return fn
def unescape_filename(fn: str) -> str:
fn = fn.replace("%%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(r, c)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%")
return fn
+45
View File
@@ -0,0 +1,45 @@
import datetime
import shlex
from pathlib import Path
from subprocess import check_output
from typing import NamedTuple
import dateutil.parser
class ExtractedCommit(NamedTuple):
sha: str
author: str
email: str
time: str
message: str
file_names: list[str]
def get_time(self) -> datetime:
return dateutil.parser.isoparse(self.time)
def git_log(path: Path, fail_silently: bool = False) -> list[ExtractedCommit]:
"""
Call and parse git log. This function requires that git>=2.37.1 is installed on your system.
:param path: Path of git repository
:param fail_silently: If true, ignore errors. If false, raise exception when errors occur.
:return: List of commits
"""
# check_call(shlex.split('git config diff.renames 0'))
cmd = f"git -c 'diff.renamelimit=0' -c 'diff.renames=0' -C '{path.absolute()}' log --name-status --diff-filter=AMD --pretty=format:'START_COMMIT_QwQ %H%n%aN%n%aE%n%aI%n%s%n'"
log = check_output(shlex.split(cmd)).decode('utf-8', 'ignore')
def extract_commit(block: str) -> ExtractedCommit:
try:
lines = block.split('\n')
sha, author, email, date, message = lines + [""] if len(lines) == 4 else lines[:5]
files = [f.replace('\t', '/') for f in lines[6:]]
return ExtractedCommit(sha, author, email, date, message, files)
except Exception as e:
print(f'========== Commit Extract Error {e} ==========\n{block}\n==========')
if not fail_silently:
raise e
return [extract_commit(c.strip()) for c in log.split('START_COMMIT_QwQ') if c]
+24
View File
@@ -0,0 +1,24 @@
import logging
import os
def setup_logger(debug: bool = os.environ.get("DEBUG", False)):
# Try to use rich for pretty printing
try:
from rich.logging import RichHandler
handler = RichHandler(rich_tracebacks=True)
from rich.traceback import install
install(show_locals=True)
except ImportError:
handler = logging.StreamHandler()
# Initialize debug logger
logging.basicConfig(
level="NOTSET" if debug else "INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[handler]
)
return logging.getLogger("a2")
+46
View File
@@ -0,0 +1,46 @@
"""
Natual language processing utils
"""
from __future__ import annotations
def camel_split(camel: str) -> list[str]:
"""
Split camel case string into sentence
Credit: https://stackoverflow.com/a/58996565/7346633
:param camel: E.g. HelloWorld or helloWorld
:return: E.g. ['Hello', 'World']
"""
# Ignore all caps or all lower
if camel.isupper() or camel.islower() or camel.isnumeric():
return [camel]
idx = list(map(str.isupper, camel))
# Mark change of case
word = [0]
for (i, (x, y)) in enumerate(zip(idx, idx[1:])):
if x and not y: # "Ul"
word.append(i)
elif not x and y: # "lU"
word.append(i + 1)
word.append(len(camel))
# for "lUl", index of "U" will pop twice, have to filter that
return [camel[x:y] for x, y in zip(word, word[1:]) if x < y]
def substr_between(s: str, start: str | None = None, end: str | None = None):
"""
Get substring between two strings
>>> substr_between('abc { meow } def', '{', '}')
' meow '
"""
if start:
s = s[s.index(start) + len(start):]
if end:
s = s[:s.index(end)]
return s
+27
View File
@@ -0,0 +1,27 @@
import requests
def setup_proxy(session: requests.Session, addr: str = 'socks5://localhost:9050', verbose: bool = True):
url = 'https://ip.me'
# Setup proxy
ip = session.get(url).text.strip()
session.proxies = {
'http': addr,
'https': addr
}
proxy_ip = session.get(url).text.strip()
# Print ip
if verbose:
print(f'Raw ip: {ip}')
print(f'Proxy ip: {proxy_ip}')
# ips shouldn't match
assert ip != proxy_ip, 'Proxy did not start correctly.'
# Disable default requests behavior
def warn(*args, **kwargs):
raise ReferenceError('Use session.get instead of requests.get')
requests.get = warn
requests.post = warn
+79
View File
@@ -0,0 +1,79 @@
"""
Importing this file requires numpy, matplotlib, and numba
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from matplotlib import pyplot as plt
from numba import njit
@dataclass
class Statistics:
mean: float
median: float
lower_quartile: float
upper_quartile: float
iqr: float
minimum: float
maximum: float
count: int
total: float
stddev: float
def get_metric_6(self) -> tuple[float, float, float, float, float, float]:
return self.mean, self.median, self.minimum, self.maximum, self.lower_quartile, self.upper_quartile
def print(self, dec: int = 2):
print(f'> Mean: {round(self.mean, dec)}, Median: {round(self.median, dec)}')
print(f'> Min: {round(self.minimum, dec)}, Max: {round(self.maximum, dec)}')
print(f'> Q1: {round(self.lower_quartile, dec)}, Q3: {round(self.upper_quartile, dec)}')
print(f'> StdDev: {round(self.stddev, dec)}, IQR: {round(self.iqr, dec)}')
print(f'> N: {self.count}')
@njit(cache=True)
def _calc_col_stats_helper(col: np.ndarray) -> tuple[float, float, float, float, float, float, float, int, float, float]:
q1 = np.quantile(col, 0.25)
q3 = np.quantile(col, 0.75)
return (
float(np.mean(col)),
float(np.median(col)),
float(q1),
float(q3),
float(q3 - q1),
float(np.min(col)),
float(np.max(col)),
len(col),
float(np.sum(col)),
float(np.std(col))
)
def calc_col_stats(col: np.ndarray | list) -> Statistics:
"""
Compute statistics for a data column
:param col: Input column (tested on 1D array)
:return: Statistics
"""
if isinstance(col, list):
col = np.array(col)
return Statistics(*_calc_col_stats_helper(col))
def plot(**kwargs) -> plt:
"""
Pyplot configurator shorthand
Example: plt_cfg(xlabel="X", ylabel="Y") is equivalent to plt.xlabel("X"); plt.ylabel("Y")
"""
for k, args in kwargs.items():
if isinstance(args, dict):
getattr(plt, k)(**args)
else:
getattr(plt, k)(args)
return plt
+206 -3
View File
@@ -1,12 +1,28 @@
from __future__ import annotations
import base64
import dataclasses
import datetime
import hashlib
import inspect
import io
import json
import pickle
from enum import Enum
from pathlib import Path
from types import SimpleNamespace
from typing import Any
def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
def pickle_encode(obj: Any, protocol=None, fix_imports=True) -> bytes:
"""
Encode object to pickle bytes
>>> by = pickle_encode({'meow': 565656})
>>> by = pickle_encode({'function': pickle_encode})
>>> len(by)
57
>>> decoded = pickle_decode(by)
>>> by = decoded['function']({'meow': 565656})
>>> pickle_decode(by)
{'meow': 565656}
"""
@@ -15,9 +31,196 @@ def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
return bio.getvalue()
def pickle_decode(by: bytes) -> any:
def pickle_decode(by: bytes) -> Any:
"""
Decode pickle bytes to object
"""
with io.BytesIO(by) as bio:
return pickle.load(bio)
def infer(o: object) -> object | None:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
# Support encoding datetime
if isinstance(o, (datetime.datetime, datetime.date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
# Support for Path
if isinstance(o, Path):
return str(o)
# Support for byte arrays (encode as base64 string)
if isinstance(o, bytes):
return base64.b64encode(o).decode()
# Enums
if isinstance(o, Enum):
return o.name
return None
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
return infer(o) or super().default(o)
class ForceJSONEcoder(EnhancedJSONEncoder):
"""
A json encoder that can serialize almost everything (including custom classes, byte arrays)
"""
def default(self, o: object) -> object:
infer_result = infer(o)
if infer_result:
return infer_result
# # Support EnumType
# if isinstance(o, EnumType):
# return {i.name: i.value for i in o}
# Support for custom classes (get dict values)
if hasattr(o, '__dict__') and not inspect.isclass(o):
return dict(vars(o))
return super().default(o)
def json_stringify(obj: object, forced: bool = True, **kwargs) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param forced: Whether to force the conversion of classes and byte arrays
:return: Json strings
"""
args = dict(ensure_ascii=False, cls=ForceJSONEcoder if forced else EnhancedJSONEncoder)
args.update(kwargs)
return json.dumps(obj, **args)
class SafeNamespace(SimpleNamespace):
def __getattr__(self, attr):
try:
return super().__getattr__(attr)
except AttributeError:
return None
def jsn(s: str) -> SafeNamespace:
return json.loads(s, object_hook=lambda d: SafeNamespace(**d))
def ensure_dir(path: Path | str) -> Path:
"""
Ensure that the directory exists (and create if not)
:returns The directory
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def ensure_parent(path: Path | str) -> Path:
"""
Ensure that the parent directory of a path exists (and create if not)
:return: The directory
"""
path = Path(path)
ensure_dir(path.parent)
return path
def write(fp: Path | str, data: bytes | str):
"""
Make sure the directory exists, and then write data, either in bytes or string.
Also forces utf-8 encoding for strings.
"""
fp = ensure_parent(fp)
if isinstance(data, str):
return fp.write_text(data, 'utf-8')
if isinstance(data, bytes):
return fp.write_bytes(data)
def read(file: Path | str) -> str:
"""
Read file content, force utf-8
:param file: File path
:return: File content
"""
return Path(file).read_text('utf-8')
def write_json(fp: Path | str, data: Any, **kwargs):
write(fp, json_stringify(data, **kwargs))
def parse_date_time(iso: str) -> datetime.datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime.datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
def md5(file: Path | str) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
+40
View File
@@ -0,0 +1,40 @@
"""
Importing this file requires installing tqdm.
"""
from __future__ import annotations
import os
from functools import partial
from typing import Callable, Iterable
import tqdm
from tqdm.contrib.concurrent import process_map, thread_map
def smap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
return [fn(i) for i in tqdm.tqdm(lst, position=0, leave=True, *args, **kwargs)]
def pmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tqdm_args.update(kwargs)
return process_map(fn, lst, *args, **tqdm_args)
def tmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tqdm_args.update(kwargs)
return thread_map(fn, lst, *args, **tqdm_args)
def tq(it: Iterable, desc: str, *args, **kwargs) -> tqdm:
tqdm_args = dict(position=0, leave=True)
return tqdm.tqdm(it, desc, *args, **{**tqdm_args, **kwargs})
def patch_tqdm():
tqdm_args = dict(chunksize=1, position=0, leave=True, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tq: Callable[[Iterable], tqdm.tqdm] = partial(tqdm.tqdm, position=0, leave=True)
pmap = partial(process_map, **tqdm_args)
tmap = partial(thread_map, **tqdm_args)
return tq, pmap, tmap
+1
View File
@@ -26,6 +26,7 @@ setup(
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
packages=find_packages(exclude=("tests",)),
include_package_data=True,