Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 376ecaa26e | |||
| f990731261 | |||
| fb57ec06ae | |||
| b76a624b4f | |||
| 7f56d94fe8 | |||
| 8d68f22eaa | |||
| e29d0f2c00 | |||
| 3b72889785 | |||
| 6911cad00e | |||
| 2ac83c5808 |
+24
-2
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
__version__ = "1.0.7"
|
||||
__version__ = "1.0.11"
|
||||
|
||||
import dataclasses
|
||||
import hashlib
|
||||
@@ -8,7 +8,8 @@ import json
|
||||
import time
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
from typing import Union, Callable
|
||||
from types import SimpleNamespace
|
||||
|
||||
|
||||
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
|
||||
@@ -111,6 +112,10 @@ class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
# https://stackoverflow.com/a/51286749/7346633
|
||||
if dataclasses.is_dataclass(o):
|
||||
return dataclasses.asdict(o)
|
||||
|
||||
# Simple namespace
|
||||
if isinstance(o, SimpleNamespace):
|
||||
return o.__dict__
|
||||
|
||||
# Support encoding datetime
|
||||
if isinstance(o, (datetime, date)):
|
||||
@@ -139,6 +144,10 @@ def json_stringify(obj: object, indent: Union[int, None] = None) -> str:
|
||||
return json.dumps(obj, indent=indent, cls=EnhancedJSONEncoder, ensure_ascii=False)
|
||||
|
||||
|
||||
def jsn(s: str):
|
||||
return json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
|
||||
|
||||
|
||||
def write(file: Union[str, Path], text: str) -> None:
|
||||
"""
|
||||
Write text to a file
|
||||
@@ -202,3 +211,16 @@ class Timer:
|
||||
|
||||
def reset(self):
|
||||
self.start = time.time_ns()
|
||||
|
||||
|
||||
def mem(var: str):
|
||||
print(f'Memory usage for {var}: {eval(f"sys.getsizeof({var})") / 1024:.1f}KB')
|
||||
|
||||
|
||||
def run_time(func: Callable, *args, **kwargs):
|
||||
name = getattr(func, '__name__', 'function')
|
||||
start = time.time_ns()
|
||||
iter = kwargs.pop('iter', 10)
|
||||
_ = [func(*args, **kwargs) for _ in range(iter)]
|
||||
ms = (time.time_ns() - start) / 1e6
|
||||
print(f'RT {name:30} {ms:6.1f} ms')
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
"""
|
||||
Natual language processing utils
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def camel_split(camel: str) -> list[str]:
|
||||
"""
|
||||
Split camel case string into sentence
|
||||
|
||||
Credit: https://stackoverflow.com/a/58996565/7346633
|
||||
|
||||
:param camel: E.g. HelloWorld or helloWorld
|
||||
:return: E.g. ['Hello', 'World']
|
||||
"""
|
||||
# Ignore all caps or all lower
|
||||
if camel.isupper() or camel.islower() or camel.isnumeric():
|
||||
return [camel]
|
||||
|
||||
idx = list(map(str.isupper, camel))
|
||||
|
||||
# Mark change of case
|
||||
word = [0]
|
||||
for (i, (x, y)) in enumerate(zip(idx, idx[1:])):
|
||||
if x and not y: # "Ul"
|
||||
word.append(i)
|
||||
elif not x and y: # "lU"
|
||||
word.append(i + 1)
|
||||
word.append(len(camel))
|
||||
|
||||
# for "lUl", index of "U" will pop twice, have to filter that
|
||||
return [camel[x:y] for x, y in zip(word, word[1:]) if x < y]
|
||||
|
||||
|
||||
def substr_between(s: str, start: str | None = None, end: str | None = None):
|
||||
"""
|
||||
Get substring between two strings
|
||||
|
||||
>>> substr_between('abc { meow } def', '{', '}')
|
||||
' meow '
|
||||
"""
|
||||
if start:
|
||||
s = s[s.index(start) + len(start):]
|
||||
if end:
|
||||
s = s[:s.index(end)]
|
||||
return s
|
||||
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
Importing this file requires numpy, matplotlib, and numba
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import numpy as np
|
||||
from matplotlib import pyplot as plt
|
||||
from numba import njit
|
||||
|
||||
|
||||
@dataclass
|
||||
class Statistics:
|
||||
mean: float
|
||||
median: float
|
||||
lower_quartile: float
|
||||
upper_quartile: float
|
||||
iqr: float
|
||||
minimum: float
|
||||
maximum: float
|
||||
count: int
|
||||
total: float
|
||||
stddev: float
|
||||
|
||||
def get_metric_6(self) -> tuple[float, float, float, float, float, float]:
|
||||
return self.mean, self.median, self.minimum, self.maximum, self.lower_quartile, self.upper_quartile
|
||||
|
||||
def print(self, dec: int = 2):
|
||||
print(f'> Mean: {round(self.mean, dec)}, Median: {round(self.median, dec)}')
|
||||
print(f'> Min: {round(self.minimum, dec)}, Max: {round(self.maximum, dec)}')
|
||||
print(f'> Q1: {round(self.lower_quartile, dec)}, Q3: {round(self.upper_quartile, dec)}')
|
||||
print(f'> StdDev: {round(self.stddev, dec)}, IQR: {round(self.iqr, dec)}')
|
||||
print(f'> N: {self.count}')
|
||||
|
||||
|
||||
@njit(cache=True)
|
||||
def _calc_col_stats_helper(col: np.ndarray) -> tuple[float, float, float, float, float, float, float, int, float, float]:
|
||||
q1 = np.quantile(col, 0.25)
|
||||
q3 = np.quantile(col, 0.75)
|
||||
return (
|
||||
float(np.mean(col)),
|
||||
float(np.median(col)),
|
||||
float(q1),
|
||||
float(q3),
|
||||
float(q3 - q1),
|
||||
float(np.min(col)),
|
||||
float(np.max(col)),
|
||||
len(col),
|
||||
float(np.sum(col)),
|
||||
float(np.std(col))
|
||||
)
|
||||
|
||||
|
||||
def calc_col_stats(col: np.ndarray | list) -> Statistics:
|
||||
"""
|
||||
Compute statistics for a data column
|
||||
|
||||
:param col: Input column (tested on 1D array)
|
||||
:return: Statistics
|
||||
"""
|
||||
if isinstance(col, list):
|
||||
col = np.array(col)
|
||||
return Statistics(*_calc_col_stats_helper(col))
|
||||
|
||||
|
||||
def plot(**kwargs) -> plt:
|
||||
"""
|
||||
Pyplot configurator shorthand
|
||||
|
||||
Example: plt_cfg(xlabel="X", ylabel="Y") is equivalent to plt.xlabel("X"); plt.ylabel("Y")
|
||||
"""
|
||||
for k, args in kwargs.items():
|
||||
if isinstance(args, dict):
|
||||
getattr(plt, k)(**args)
|
||||
else:
|
||||
getattr(plt, k)(args)
|
||||
return plt
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import pickle
|
||||
|
||||
@@ -6,7 +8,11 @@ def pickle_encode(obj: any, protocol=None, fix_imports=True) -> bytes:
|
||||
"""
|
||||
Encode object to pickle bytes
|
||||
|
||||
>>> by = pickle_encode({'meow': 565656})
|
||||
>>> by = pickle_encode({'function': pickle_encode})
|
||||
>>> len(by)
|
||||
57
|
||||
>>> decoded = pickle_decode(by)
|
||||
>>> by = decoded['function']({'meow': 565656})
|
||||
>>> pickle_decode(by)
|
||||
{'meow': 565656}
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
"""
|
||||
Importing this file requires installing tqdm.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from functools import partial
|
||||
from typing import Callable, Iterable
|
||||
|
||||
import tqdm
|
||||
from tqdm.contrib.concurrent import process_map, thread_map
|
||||
|
||||
|
||||
def smap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
|
||||
return [fn(i) for i in tqdm.tqdm(lst, position=0, leave=True, *args, **kwargs)]
|
||||
|
||||
|
||||
def pmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
|
||||
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
|
||||
tqdm_args.update(kwargs)
|
||||
return process_map(fn, lst, *args, **tqdm_args)
|
||||
|
||||
|
||||
def tmap(fn: Callable, lst: Iterable, *args, **kwargs) -> list:
|
||||
tqdm_args = dict(position=0, leave=True, chunksize=1, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
|
||||
tqdm_args.update(kwargs)
|
||||
return thread_map(fn, lst, *args, **tqdm_args)
|
||||
|
||||
|
||||
def tq(it: Iterable, desc: str, *args, **kwargs) -> tqdm:
|
||||
tqdm_args = dict(position=0, leave=True)
|
||||
return tqdm.tqdm(it, desc, *args, **{**tqdm_args, **kwargs})
|
||||
|
||||
|
||||
def patch_tqdm():
|
||||
tqdm_args = dict(chunksize=1, position=0, leave=True, tqdm_class=tqdm.tqdm, max_workers=os.cpu_count())
|
||||
tq: Callable[[Iterable], tqdm.tqdm] = partial(tqdm.tqdm, position=0, leave=True)
|
||||
pmap = partial(process_map, **tqdm_args)
|
||||
tmap = partial(thread_map, **tqdm_args)
|
||||
return tq, pmap, tmap
|
||||
Reference in New Issue
Block a user