12 Commits

Author SHA1 Message Date
Azalea (on HyDEV-Daisy) de3a30ef34 [U] 1.0.13 2022-08-26 00:35:06 -04:00
Azalea (on HyDEV-Daisy) 7d419b375b [U] Modular structure 2022-08-26 00:31:44 -04:00
Azalea (on HyDEV-Daisy) 376ecaa26e [U] Release 1.0.11 2022-08-25 23:21:31 -04:00
Azalea (on HyDEV-Daisy) f990731261 [+] Add stddev to scientific utils 2022-08-25 23:16:48 -04:00
Azalea (on HyDEV-Daisy) fb57ec06ae [F] Fix bugs in tqdm utils 2022-08-25 23:12:50 -04:00
Hykilpikonna b76a624b4f [+] Add simple namespace support 2022-08-15 20:05:38 -04:00
Hykilpikonna 7f56d94fe8 [F] Fix tqdm utils 2022-08-13 18:30:45 -04:00
Hykilpikonna 8d68f22eaa [O] Use doctests 2022-08-13 17:51:19 -04:00
Hykilpikonna e29d0f2c00 [+] substr_between function 2022-08-13 17:50:02 -04:00
Hykilpikonna 3b72889785 [+] Add nlp utils 2022-07-25 13:16:46 -04:00
Azalea (on HyDEV-Daisy) 6911cad00e [F] Fix imports 2022-07-03 02:24:31 -04:00
Azalea (on HyDEV-Daisy) 2ac83c5808 [+] tqdm and scientific utils 2022-07-03 02:17:34 -04:00
6 changed files with 400 additions and 183 deletions
+17 -180
View File
@@ -1,188 +1,12 @@
from __future__ import annotations
__version__ = "1.0.7"
__version__ = "1.0.13"
import dataclasses
import hashlib
import json
import time
from datetime import datetime, date
from pathlib import Path
from typing import Union
from typing import Callable
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
def parse_date_time(iso: str) -> datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Support encoding datetime
if isinstance(o, (datetime, date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
return super().default(o)
def json_stringify(obj: object, indent: Union[int, None] = None) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:param indent: Indent size or none
:return: Json strings
"""
return json.dumps(obj, indent=indent, cls=EnhancedJSONEncoder, ensure_ascii=False)
def write(file: Union[str, Path], text: str) -> None:
"""
Write text to a file
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:param text: Text
:return: None
"""
file = Path(file)
file.parent.mkdir(parents=True, exist_ok=True)
with file.open('w', encoding='utf-8') as f:
f.write(text)
def read(file: Union[str, Path]) -> str:
"""
Read file content
Preconditions:
- file != ''
:param file: File path (will be converted to lowercase)
:return: None
"""
return file.read_text('utf-8')
def md5(file: Union[str, Path]) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
from .color_utils import *
from .serializer import *
class Timer:
@@ -202,3 +26,16 @@ class Timer:
def reset(self):
self.start = time.time_ns()
def mem(var: str):
print(f'Memory usage for {var}: {eval(f"sys.getsizeof({var})") / 1024:.1f}KB')
def run_time(func: Callable, *args, **kwargs):
name = getattr(func, '__name__', 'function')
start = time.time_ns()
iter = kwargs.pop('iter', 10)
_ = [func(*args, **kwargs) for _ in range(iter)]
ms = (time.time_ns() - start) / 1e6
print(f'RT {name:30} {ms:6.1f} ms')
+58
View File
@@ -0,0 +1,58 @@
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
"""
Convert rgb color into ANSI escape code format
:param r:
:param g:
:param b:
:param foreground: Whether the color applies to forground
:return: Escape code
"""
c = '38' if foreground else '48'
return f'\033[{c};2;{r};{g};{b}m'
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m",
"&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m",
"&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m",
"&f/\033[1;37m",
"&r/\033[0m", "&l/\033[1m", "&o/\033[3m", "&n/\033[4m", "&-/\n"]
replacements = [(r[:2], r[3:]) for r in replacements]
def color(msg: str) -> str:
"""
Replace extended minecraft color codes in string
:param msg: Message with minecraft color codes
:return: Message with escape codes
"""
for code, esc in replacements:
msg = msg.replace(code, esc)
while '&gf(' in msg or '&gb(' in msg:
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
end = msg.index(')', i)
code = msg[i + 4:end]
fore = msg[i + 2] == 'f'
if code.startswith('#'):
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
else:
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
rgb = tuple(int(c) for c in code.split(' '))
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
return msg
def printc(msg: str):
"""
Print with color
:param msg: Message with minecraft color codes
"""
print(color(msg + '&r'))
+46
View File
@@ -0,0 +1,46 @@
"""
Natual language processing utils
"""
from __future__ import annotations
def camel_split(camel: str) -> list[str]:
"""
Split camel case string into sentence
Credit: https://stackoverflow.com/a/58996565/7346633
:param camel: E.g. HelloWorld or helloWorld
:return: E.g. ['Hello', 'World']
"""
# Ignore all caps or all lower
if camel.isupper() or camel.islower() or camel.isnumeric():
return [camel]
idx = list(map(str.isupper, camel))
# Mark change of case
word = [0]
for (i, (x, y)) in enumerate(zip(idx, idx[1:])):
if x and not y: # "Ul"
word.append(i)
elif not x and y: # "lU"
word.append(i + 1)
word.append(len(camel))
# for "lUl", index of "U" will pop twice, have to filter that
return [camel[x:y] for x, y in zip(word, word[1:]) if x < y]
def substr_between(s: str, start: str | None = None, end: str | None = None):
"""
Get substring between two strings
>>> substr_between('abc { meow } def', '{', '}')
' meow '
"""
if start:
s = s[s.index(start) + len(start):]
if end:
s = s[:s.index(end)]
return s
+79
View File
@@ -0,0 +1,79 @@
"""
Importing this file requires numpy, matplotlib, and numba
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from matplotlib import pyplot as plt
from numba import njit
@dataclass
class Statistics:
mean: float
median: float
lower_quartile: float
upper_quartile: float
iqr: float
minimum: float
maximum: float
count: int
total: float
stddev: float
def get_metric_6(self) -> tuple[float, float, float, float, float, float]:
return self.mean, self.median, self.minimum, self.maximum, self.lower_quartile, self.upper_quartile
def print(self, dec: int = 2):
print(f'> Mean: {round(self.mean, dec)}, Median: {round(self.median, dec)}')
print(f'> Min: {round(self.minimum, dec)}, Max: {round(self.maximum, dec)}')
print(f'> Q1: {round(self.lower_quartile, dec)}, Q3: {round(self.upper_quartile, dec)}')
print(f'> StdDev: {round(self.stddev, dec)}, IQR: {round(self.iqr, dec)}')
print(f'> N: {self.count}')
@njit(cache=True)
def _calc_col_stats_helper(col: np.ndarray) -> tuple[float, float, float, float, float, float, float, int, float, float]:
q1 = np.quantile(col, 0.25)
q3 = np.quantile(col, 0.75)
return (
float(np.mean(col)),
float(np.median(col)),
float(q1),
float(q3),
float(q3 - q1),
float(np.min(col)),
float(np.max(col)),
len(col),
float(np.sum(col)),
float(np.std(col))
)
def calc_col_stats(col: np.ndarray | list) -> Statistics:
"""
Compute statistics for a data column
:param col: Input column (tested on 1D array)
:return: Statistics
"""
if isinstance(col, list):
col = np.array(col)
return Statistics(*_calc_col_stats_helper(col))
def plot(**kwargs) -> plt:
"""
Pyplot configurator shorthand
Example: plt_cfg(xlabel="X", ylabel="Y") is equivalent to plt.xlabel("X"); plt.ylabel("Y")
"""
for k, args in kwargs.items():
if isinstance(args, dict):
getattr(plt, k)(**args)
else:
getattr(plt, k)(args)
return plt
+160 -3
View File
@@ -1,12 +1,25 @@
from __future__ import annotations
import dataclasses
import datetime
import hashlib
import io
import json
import pickle
from pathlib import Path
from types import SimpleNamespace
from typing import Any
def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
def pickle_encode(obj: Any, protocol=None, fix_imports=True) -> bytes:
"""
Encode object to pickle bytes
>>> by = pickle_encode({'meow': 565656})
>>> by = pickle_encode({'function': pickle_encode})
>>> len(by)
57
>>> decoded = pickle_decode(by)
>>> by = decoded['function']({'meow': 565656})
>>> pickle_decode(by)
{'meow': 565656}
"""
@@ -15,9 +28,153 @@ def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
return bio.getvalue()
def pickle_decode(by: bytes) -> any:
def pickle_decode(by: bytes) -> Any:
"""
Decode pickle bytes to object
"""
with io.BytesIO(by) as bio:
return pickle.load(bio)
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
# Support encoding datetime
if isinstance(o, (datetime.datetime, datetime.date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
return super().default(o)
def json_stringify(obj: object, **kwargs) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
Preconditions:
- obj != None
:param obj: Objects
:return: Json strings
"""
args = dict(ensure_ascii=False, cls=EnhancedJSONEncoder)
args.update(kwargs)
return json.dumps(obj, **args)
def jsn(s: str) -> SimpleNamespace:
return json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
def ensure_dir(path: Path | str) -> Path:
"""
Ensure that the directory exists (and create if not)
:returns The directory
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def ensure_parent(path: Path | str) -> Path:
"""
Ensure that the parent directory of a path exists (and create if not)
:return: The directory
"""
path = Path(path)
ensure_dir(path.parent)
return path
def write(fp: Path | str, data: bytes | str):
"""
Make sure the directory exists, and then write data, either in bytes or string.
Also forces utf-8 encoding for strings.
"""
fp = ensure_parent(fp)
if isinstance(data, str):
return fp.write_text(data, 'utf-8')
if isinstance(data, bytes):
return fp.write_bytes(data)
def read(file: Path | str) -> str:
"""
Read file content, force utf-8
:param file: File path
:return: File content
"""
return Path(file).read_text('utf-8')
def write_json(fp: Path | str, data: Any):
write(fp, json_stringify(data))
def parse_date_time(iso: str) -> datetime.datetime:
"""
Parse date faster. Running 1,000,000 trials, this parse_date function is 4.03 times faster than
python's built-in dateutil.parser.isoparse() function.
Preconditions:
- iso is the output of datetime.isoformat() (In a format like "2021-10-20T23:50:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]),
int(iso[11:13]), int(iso[14:16]), int(iso[17:19]))
def parse_date_only(iso: str) -> datetime.datetime:
"""
Parse date faster.
Preconditions:
- iso starts with the format of "YYYY-MM-DD" (e.g. "2021-10-20" or "2021-10-20T10:04:14")
- iso is a valid date (this function does not check for the validity of the input)
:param iso: Input date
:return: Datetime object
"""
return datetime.datetime(int(iso[:4]), int(iso[5:7]), int(iso[8:10]))
def md5(file: Path | str) -> str:
"""
Compute md5 of a file
:param file: File path
:return: md5 string
"""
file = Path(file)
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
+40
View File
@@ -0,0 +1,40 @@
"""
Importing this file requires installing tqdm.
"""
from __future__ import annotations
import os
from functools import partial
from typing import Callable, Iterable
import tqdm
from tqdm.contrib.concurrent import process_map, thread_map
def smap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
return [fn(i) for i in tqdm.tqdm(lst, position=0, leave=True, *args, **kwargs)]
def pmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tqdm_args.update(kwargs)
return process_map(fn, lst, *args, **tqdm_args)
def tmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tqdm_args.update(kwargs)
return thread_map(fn, lst, *args, **tqdm_args)
def tq(it: Iterable, desc: str, *args, **kwargs) -> tqdm:
tqdm_args = dict(position=0, leave=True)
return tqdm.tqdm(it, desc, *args, **{**tqdm_args, **kwargs})
def patch_tqdm():
tqdm_args = dict(chunksize=1, position=0, leave=True, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
tq: Callable[[Iterable], tqdm.tqdm] = partial(tqdm.tqdm, position=0, leave=True)
pmap = partial(process_map, **tqdm_args)
tmap = partial(thread_map, **tqdm_args)
return tq, pmap, tmap