21 Commits

Author SHA1 Message Date
Azalea Gui 6291d178d4 [+] Git utils 2023-11-06 03:00:05 -05:00
Azalea 87a46fcf28 [F] ipconfig.me is blocking proxy blacklist 2023-10-05 21:31:47 -04:00
azalea df16f90a8f [U] Release 1.0.19 2023-07-28 20:52:27 -07:00
azalea 25aecabd34 [+] SafeNamespace 2023-07-28 20:49:51 -07:00
Azalea Gui 332a63479e [U] Release 1.0.18 2023-03-09 02:42:36 -05:00
Azalea Gui b748a217a0 [+] Logging utils 2023-03-09 02:42:03 -05:00
Azalea Gui afaef06f40 [+] Add setup_proxy 2023-01-29 23:54:53 -05:00
Azalea Gui 1948ff4a9c [O] Allow disabling progress bar during download 2023-01-13 05:48:43 -05:00
Hykilpikonna 80bf1da83d [U] Release 1.0.17 2022-12-19 01:42:28 -05:00
Hykilpikonna 6a60712d8c [+] deep_dict 2022-12-19 01:40:36 -05:00
Hykilpikonna 0530d41f42 [+] dict recursive remove 2022-12-19 01:38:52 -05:00
Hykilpikonna f6aa847368 [+] File name escaping 2022-12-19 01:12:08 -05:00
Hykilpikonna cb6aff290d [O] json support more types 2022-12-19 01:10:31 -05:00
Hykilpikonna 1325224fd8 [O] Support serializing Path, custom class, and byte arrays 2022-12-18 22:08:54 -05:00
Hykilpikonna 04f987cab8 [F] Fix download rate inverting 2022-12-18 21:59:50 -05:00
Hykilpikonna 156562f5a3 [U] Release 1.0.16 2022-10-29 19:07:13 -04:00
Hykilpikonna 26d756e628 [F] Fix error during download if content-length is not present 2022-10-29 19:06:56 -04:00
Azalea e0b2ef63b7 [U] Release 1.0.15 2022-10-29 17:23:50 -04:00
Azalea 374aedabb6 [F] Fix downloader 2022-10-29 17:08:23 -04:00
Azalea c28ca20edc [U] Release 1.0.14 2022-10-29 17:02:40 -04:00
Azalea 2480f4e690 [+] Downloader 2022-10-29 17:02:20 -04:00
11 changed files with 345 additions and 22 deletions
+2
View File
@@ -116,3 +116,5 @@ dmypy.json
# Custom
.idea
HyPyUtils.iml
.DS_Store
._*
+8
View File
@@ -1,2 +1,10 @@
# HyPyUtils
HyDEV Utils for Python
## Modules
| Module | Requirements |
|--------------------|--------------------------|
| `tqdm_utils` | tqdm |
| `downloader` | tqdm, requests |
| `scientific_utils` | numpy, numba, matplotlib |
+1 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
__version__ = "1.0.13"
__version__ = "1.0.19"
import time
from typing import Callable
+86
View File
@@ -0,0 +1,86 @@
from hypy_utils import infer
def is_non_empty(o):
return not hasattr(o, '__len__') or len(o) > 0
def remove_values(d: dict | list, vals: list, preserve_list: bool = False) -> dict | list:
"""
Recursively remove values from a dict
:param d: Dict
:param vals: Values to remove
:param preserve_list: Whether to ignore list elements
:return: Dict without specific values
"""
if isinstance(d, list):
d = [remove_values(i, vals, preserve_list) for i in d if preserve_list or i not in vals]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_values(v, vals, preserve_list) for k, v in d.items() if v not in vals}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def remove_nones(d: dict | list, preserve_list: bool = False) -> dict:
"""
Recursively remove nones from a dict
>>> remove_nones({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]})
{'a': {'c': 1}, 'd': [1]}
:param d: Dict
:param preserve_list: Whether to ignore list elements
:return: Dict without nones
"""
return remove_values(d, [None], preserve_list=preserve_list)
def remove_keys(d: dict | list, keys: set) -> dict | list:
"""
Recursively remove keys
>>> remove_keys({'a': {'b': None, 'c': 1}, 'b': [None, {'a': None}], 'c': {'a': None}, 'd': [None, 1]}, {'b'})
{'a': {'c': 1}, 'c': {'a': None}, 'd': [None, 1]}
:param d: The dictionary that you want to remove keys from
:param keys: Set of keys you want to remove
:return: Dict without specific keys
"""
if isinstance(d, list):
d = [remove_keys(i, keys) for i in d]
d = [i for i in d if is_non_empty(i)]
return d
if isinstance(d, dict):
d = {k: remove_keys(v, keys) for k, v in d.items() if k not in keys}
d = {k: v for k, v in d.items() if is_non_empty(v)}
return d
return d
def deep_dict(o: object, exclude: set | None):
"""
Recursively convert an object into a dictionary
:param o: Object
:param exclude: Keys to exclude
:return: Deep dictionary of the object's variables
"""
exclude = exclude or {}
infer_result = infer(o)
if infer_result:
return infer_result
if hasattr(o, '__dict__'):
return deep_dict(dict(vars(o)), exclude)
if isinstance(o, dict):
return {k: deep_dict(v, exclude) for k, v in o.items() if k not in exclude}
if isinstance(o, list):
return [deep_dict(v, exclude) for v in o]
return o
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
import os
from pathlib import Path
import requests
import tqdm
def download_file(url: str, file: str | Path, progress: bool = True):
"""
Helper method handling downloading large files from `url` to `filename`.
Returns a pointer to `filename`.
https://stackoverflow.com/a/42071418/7346633
"""
file = Path(file)
if file.is_file():
return file
chunk_size = 1024
try:
term_len = os.get_terminal_size().columns
bar_len = int(term_len * 0.4)
except Exception:
term_len = 60
bar_len = 20
tqdm_args = dict()
r = requests.get(url, stream=True)
if 'content-length' in r.headers:
tqdm_args['total'] = int(r.headers['content-length']) / 1024 / 1024
with open(file, 'wb') as f:
pbar = None
if progress:
pbar = tqdm.tqdm(unit=" MB", ncols=term_len,
bar_format='{desc} {rate_noinv_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #',
desc=file.name[:bar_len].ljust(bar_len), **tqdm_args)
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
if pbar:
pbar.update(len(chunk) / 1024 / 1024)
f.write(chunk)
return file
+37
View File
@@ -0,0 +1,37 @@
import base64
FILENAME_BLACKLIST = [
# Unix and Windows
"/",
# Windows only
"<", ">", ":", '"', "\\", "|", "?", "*", "\0",
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
# Just for extra safety
"~"
]
FILENAME_REPLACE = {c: f"%{base64.b64encode(c.encode()).decode().replace('=', '')}" for c in FILENAME_BLACKLIST}
def escape_filename(fn: str) -> str:
fn = fn.replace("%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(c, r)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%%")
return fn
def unescape_filename(fn: str) -> str:
fn = fn.replace("%%", "[ PeRcEnT EsCaPe owo ]")
for c, r in FILENAME_REPLACE.items():
fn = fn.replace(r, c)
fn = fn.replace("[ PeRcEnT EsCaPe owo ]", "%")
return fn
+45
View File
@@ -0,0 +1,45 @@
import datetime
import shlex
from pathlib import Path
from subprocess import check_output
from typing import NamedTuple
import dateutil
class ExtractedCommit(NamedTuple):
sha: str
author: str
email: str
time: str
message: str
file_names: list[str]
def get_time(self) -> datetime:
return dateutil.parser.isoparse(self.time)
def git_log(path: Path, fail_silently: False) -> list[ExtractedCommit]:
"""
Call and parse git log. This function requires that git>=2.37.1 is installed on your system.
:param path: Path of git repository
:param fail_silently: If true, ignore errors. If false, raise exception when errors occur.
:return: List of commits
"""
# check_call(shlex.split('git config diff.renames 0'))
cmd = f"git -c 'diff.renamelimit=0' -c 'diff.renames=0' -C '{path.absolute()}' log --name-status --diff-filter=AMD --pretty=format:'START_COMMIT_QwQ %H%n%aN%n%aE%n%aI%n%s%n'"
log = check_output(shlex.split(cmd)).decode('utf-8', 'ignore')
def extract_commit(block: str) -> ExtractedCommit:
try:
lines = block.split('\n')
sha, author, email, date, message = lines + [""] if len(lines) == 4 else lines[:5]
files = [f.replace('\t', '/') for f in lines[6:]]
return ExtractedCommit(sha, author, email, date, message, files)
except Exception as e:
print(f'========== Commit Extract Error {e} ==========\n{block}\n==========')
if not fail_silently:
raise e
return [extract_commit(c.strip()) for c in log.split('START_COMMIT_QwQ') if c]
+24
View File
@@ -0,0 +1,24 @@
import logging
import os
def setup_logger(debug: bool = os.environ.get("DEBUG", False)):
# Try to use rich for pretty printing
try:
from rich.logging import RichHandler
handler = RichHandler(rich_tracebacks=True)
from rich.traceback import install
install(show_locals=True)
except ImportError:
handler = logging.StreamHandler()
# Initialize debug logger
logging.basicConfig(
level="NOTSET" if debug else "INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[handler]
)
return logging.getLogger("a2")
+27
View File
@@ -0,0 +1,27 @@
import requests
def setup_proxy(session: requests.Session, addr: str = 'socks5://localhost:9050', verbose: bool = True):
url = 'https://ip.me'
# Setup proxy
ip = session.get(url).text.strip()
session.proxies = {
'http': addr,
'https': addr
}
proxy_ip = session.get(url).text.strip()
# Print ip
if verbose:
print(f'Raw ip: {ip}')
print(f'Proxy ip: {proxy_ip}')
# ips shouldn't match
assert ip != proxy_ip, 'Proxy did not start correctly.'
# Disable default requests behavior
def warn(*args, **kwargs):
raise ReferenceError('Use session.get instead of requests.get')
requests.get = warn
requests.post = warn
+67 -21
View File
@@ -1,11 +1,14 @@
from __future__ import annotations
import base64
import dataclasses
import datetime
import hashlib
import inspect
import io
import json
import pickle
from enum import Enum
from pathlib import Path
from types import SimpleNamespace
from typing import Any
@@ -36,36 +39,70 @@ def pickle_decode(by: bytes) -> Any:
return pickle.load(bio)
def infer(o: object) -> object | None:
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
# Support encoding datetime
if isinstance(o, (datetime.datetime, datetime.date)):
return o.isoformat()
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
# Support for Path
if isinstance(o, Path):
return str(o)
# Support for byte arrays (encode as base64 string)
if isinstance(o, bytes):
return base64.b64encode(o).decode()
# Enums
if isinstance(o, Enum):
return o.name
return None
class EnhancedJSONEncoder(json.JSONEncoder):
"""
An improvement to the json.JSONEncoder class, which supports:
encoding for dataclasses, encoding for datetime, and sets
"""
def default(self, o: object) -> object:
return infer(o) or super().default(o)
# Support encoding dataclasses
# https://stackoverflow.com/a/51286749/7346633
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
# Simple namespace
if isinstance(o, SimpleNamespace):
return o.__dict__
class ForceJSONEcoder(EnhancedJSONEncoder):
"""
A json encoder that can serialize almost everything (including custom classes, byte arrays)
"""
def default(self, o: object) -> object:
infer_result = infer(o)
if infer_result:
return infer_result
# Support encoding datetime
if isinstance(o, (datetime.datetime, datetime.date)):
return o.isoformat()
# # Support EnumType
# if isinstance(o, EnumType):
# return {i.name: i.value for i in o}
# Support for sets
# https://stackoverflow.com/a/8230505/7346633
if isinstance(o, set):
return list(o)
# Support for custom classes (get dict values)
if hasattr(o, '__dict__') and not inspect.isclass(o):
return dict(vars(o))
return super().default(o)
def json_stringify(obj: object, **kwargs) -> str:
def json_stringify(obj: object, forced: bool = True, **kwargs) -> str:
"""
Serialize json string with support for dataclasses and datetime and sets and with custom
configuration.
@@ -74,15 +111,24 @@ def json_stringify(obj: object, **kwargs) -> str:
- obj != None
:param obj: Objects
:param forced: Whether to force the conversion of classes and byte arrays
:return: Json strings
"""
args = dict(ensure_ascii=False, cls=EnhancedJSONEncoder)
args = dict(ensure_ascii=False, cls=ForceJSONEcoder if forced else EnhancedJSONEncoder)
args.update(kwargs)
return json.dumps(obj, **args)
def jsn(s: str) -> SimpleNamespace:
return json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
class SafeNamespace(SimpleNamespace):
def __getattr__(self, attr):
try:
return super().__getattr__(attr)
except AttributeError:
return None
def jsn(s: str) -> SafeNamespace:
return json.loads(s, object_hook=lambda d: SafeNamespace(**d))
def ensure_dir(path: Path | str) -> Path:
@@ -131,8 +177,8 @@ def read(file: Path | str) -> str:
return Path(file).read_text('utf-8')
def write_json(fp: Path | str, data: Any):
write(fp, json_stringify(data))
def write_json(fp: Path | str, data: Any, **kwargs):
write(fp, json_stringify(data, **kwargs))
def parse_date_time(iso: str) -> datetime.datetime:
+1
View File
@@ -26,6 +26,7 @@ setup(
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
packages=find_packages(exclude=("tests",)),
include_package_data=True,