From a6cef43f901ec7a0a7dd9575e6883d73c5003c5d Mon Sep 17 00:00:00 2001 From: Hykilpikonna Date: Sat, 3 Sep 2022 01:36:25 -0400 Subject: [PATCH] [+] Install feature --- README.md | 7 +- ocpm/main.py | 183 +++++++++++++++++++++++++++++-------------------- ocpm/models.py | 27 ++++---- 3 files changed, 126 insertions(+), 91 deletions(-) diff --git a/README.md b/README.md index 64d0253..6c73bdc 100644 --- a/README.md +++ b/README.md @@ -12,12 +12,15 @@ pip install ocpm ## Usage -First, cd to your EFI directory, and then run `ocpm update` to update all kexts. +First, cd to your EFI directory, and then run `ocpm -U` to update all kexts. + +Use `ocpm -I [kext names...]` to install kexts ## Features / To-do * [x] Update kexts to the latest version -* [ ] Install/uninstall kexts +* [x] Install kexts +* [ ] Uninstall kexts * [ ] Resolve dependencies * [ ] Install specific versions of a kext * [ ] Install OS-dependent kexts (like AirportItlwm) diff --git a/ocpm/main.py b/ocpm/main.py index 1a8a876..3300a57 100755 --- a/ocpm/main.py +++ b/ocpm/main.py @@ -13,7 +13,7 @@ from zipfile import ZipFile import requests import ruamel.yaml import tqdm as tqdm -from hypy_utils import printc +from hypy_utils import printc, ensure_dir from packaging import version from tqdm.contrib.concurrent import thread_map @@ -28,13 +28,13 @@ except Exception: bar_len = 20 -def get_latest_release(kext: Kext, repos: dict, pre: bool): +def get_latest_release(name: str, repos: dict, pre: bool): # Lowercase keys repos = {k.lower(): v for k, v in repos['Repos'].items()} - name = kext.name.lower() + name = name.lower() # Find repo - assert name in repos, f'Kext {kext.name} is not found in our repos. (If it\'s open source, feel free to add it in!)' + assert name in repos, f'Kext {name} is not found in our repos. (If it\'s open source, feel free to add it in!)' repo_info = repos[name] if isinstance(repo_info, str): @@ -43,7 +43,7 @@ def get_latest_release(kext: Kext, repos: dict, pre: bool): repo = repo_info['Repo'] artifact = repo_info.get('Artifact') - assert 'github.com/' in repo, f'For {kext.name}: {repo} is not a github repo, skipping...' + assert 'github.com/' in repo, f'For {name}: {repo} is not a github repo, skipping...' repo = repo.split('github.com/')[1] # Check latest version @@ -81,16 +81,40 @@ def download_file(url: str, file: str | Path): return file +def extract_kext(kext_name: str, zip_file: Path, tmp_dir: Path) -> Path | None: + if zip_file.suffix != '.zip': + print(f'Unable to process {zip_file.name}. Currently only zip files are supported.') + return None + + with ZipFile(zip_file, 'r') as zipf: + lower = kext_name.lower() + '.kext/' + + def find_name(): + for n in zipf.namelist(): + if n.lower().endswith(lower): + return n + + return None + + name = find_name() + if not name: + print(f'Unable to find {kext_name}.kext in {zip_file.name}, skipping.') + return None + + for to_extract in zipf.namelist(): + if to_extract.startswith(name): + zipf.extract(to_extract, tmp_dir) + + return tmp_dir / name + + def download_updates(efi: Path, updates: list[tuple[Kext, Release]]): # Create temporary directory with TemporaryDirectory() as tmp: start = time.time_ns() tmp = Path(tmp) - kexts = tmp / 'extract' - kexts.mkdir(parents=True, exist_ok=True) - backup = efi.parent / f'Backups/{datetime.now().strftime("%m-%d %H-%M")}' - backup.mkdir(parents=True, exist_ok=True) + kexts = ensure_dir(tmp / 'extract') print('Downloading zip files...') files = [(k, r, download_file(r.artifact.url, tmp / r.artifact.name)) for k, r in updates] @@ -98,38 +122,16 @@ def download_updates(efi: Path, updates: list[tuple[Kext, Release]]): print() print('Extracting kexts...') - def extract(k: Kext, f: Path): - if f.suffix != '.zip': - print(f'Unable to process {f.name}. Currently only zip files are supported.') - return None - - with ZipFile(f, 'r') as zipf: - lower = k.name.lower() + '.kext/' - - def find_name(): - for n in zipf.namelist(): - if n.lower().endswith(lower): - return n - - return None - - name = find_name() - if not find_name(): - print(f'Unable to find {k.name}.kext in {f.name}, skipping.') - return None - - for to_extract in zipf.namelist(): - if to_extract.startswith(name): - zipf.extract(to_extract, kexts) - - return kexts / name - - extracted = [(k, r, extract(k, f)) for k, r, f in files] + extracted = [(k, r, extract_kext(k.name, f, kexts)) for k, r, f in files] extracted = [e for e in extracted if e[2]] - print(f'Backing up original kexts to {backup}...') - for k, r, e in extracted: - shutil.move(k.path, backup / k.name) + # Backup if needed + existing = [t for t in extracted if t[0].path.is_dir()] + if len(existing) > 0: + backup = ensure_dir(efi.parent / f'Backups/{datetime.now().strftime("%m-%d %H-%M")}') + print(f'Backing up original kexts to {backup}...') + for k, r, e in existing: + shutil.move(k.path, backup / k.name) print(f'Installing new kexts...') for k, r, e in extracted: @@ -139,48 +141,19 @@ def download_updates(efi: Path, updates: list[tuple[Kext, Release]]): print(f'✨ All Done in {(time.time_ns() - start) / 1e6:,.0f}s!') -def run(): - parser = argparse.ArgumentParser(description='OpenCore Package Manager by HyDEV') - parser.add_argument('cmd', help='Command (update)') - parser.add_argument('--efi', help='EFI Directory Path', default='.') - parser.add_argument('--pre', action='store_true', help='Use pre-release') - parser.add_argument('-y', action='store_true', help='Say yes') - - printc('\n&fOpenCore Package Manager v1.0.0 by HyDEV\n') - args = parser.parse_args() - - if args.cmd.lower() != 'update': - print(f'Unknown Command: {args.cmd}') - return - - # Normalize EFI Path - efi = Path(args.efi) - if (efi / 'EFI').is_dir(): - efi = efi / 'EFI' - assert (efi / 'OC').is_dir(), 'Open Core directory (OC) not found.' - - # Find kexts - kexts_dir = efi / 'OC' / 'Kexts' - kexts = [str(f) for f in os.listdir(kexts_dir)] - kexts = [kexts_dir / f for f in kexts if f.lower().endswith('.kext')] - - kexts = [Kext(k) for k in kexts] - print(f'🔍 Found {len(kexts)} kexts in {kexts_dir}') - # print_kexts(kexts) - - # Read Repo - with open(Path(__file__).parent / 'data' / 'OCKextRepos.yml') as f: - repos = ruamel.yaml.safe_load(f) - - # Get latest repos with multithreading - def get_latest(k: Kext): +def get_latest_list(names: list[str], repos: dict, args) -> list[Release]: + def get_latest(s: str): try: - return get_latest_release(k, repos, args.pre) + return get_latest_release(s, repos, args.pre) except AssertionError as e: print(e) return None - latests = thread_map(get_latest, kexts, desc='Fetching Updates'.ljust(bar_len), max_workers=32, bar_format='{desc} {rate_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #', unit=' pkg') + return thread_map(get_latest, names, desc='Fetching Kexts'.ljust(bar_len), max_workers=32, bar_format='{desc} {rate_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #', unit=' pkg') + + +def update(args, repos: dict, kexts: list[Kext], efi: Path): + latests = get_latest_list([k.name for k in kexts], repos, args) # Compare versions updates: list[tuple[Kext, Release]] @@ -208,5 +181,63 @@ def run(): download_updates(efi, updates) +def install(args, repos: dict, kexts: list[Kext], efi: Path): + names = args.install + latests = get_latest_list(names, repos, args) + + # Compare versions + updates: list[tuple[Kext, Release]] + updates = [(Kext(efi / 'OC' / 'Kexts' / n, n, version=l.tag), l) for n, l in zip(names, latests) if l] + + # Download prompt + print() + print(f'Total download size: {sizeof_fmt(sum(l.artifact.size for k, l in updates))}') + proceed = 'y' if args.y else input(f'🚀 Ready to fly? [y/N] ') + + if proceed.lower().strip() != 'y': + print() + print('😕 Huh, okay') + exit(0) + + print() + download_updates(efi, updates) + + +def run(): + parser = argparse.ArgumentParser(description='OpenCore Package Manager by HyDEV') + parser.add_argument('-U', '--update', action='store_true', help='Update') + parser.add_argument('-S', '--install', nargs='+', help='Install packages') + parser.add_argument('--efi', help='EFI Directory Path', default='.') + parser.add_argument('--pre', action='store_true', help='Use pre-release') + parser.add_argument('-y', action='store_true', help='Say yes') + + printc('\n&fOpenCore Package Manager v1.0.0 by HyDEV\n') + args = parser.parse_args() + + # Normalize EFI Path + efi = Path(args.efi) + if (efi / 'EFI').is_dir(): + efi = efi / 'EFI' + assert (efi / 'OC').is_dir(), 'Open Core directory (OC) not found.' + + # Find kexts + kexts_dir = efi / 'OC' / 'Kexts' + kexts = [str(f) for f in os.listdir(kexts_dir)] + kexts = [kexts_dir / f for f in kexts if f.lower().endswith('.kext')] + + kexts = [Kext.from_path(k) for k in kexts] + print(f'🔍 Found {len(kexts)} kexts in {kexts_dir}') + + # Read Repo + with open(Path(__file__).parent / 'data' / 'OCKextRepos.yml') as f: + repos = ruamel.yaml.safe_load(f) + + if args.update: + return update(args, repos, kexts, efi) + + if args.install: + return install(args, repos, kexts, efi) + + if __name__ == '__main__': run() diff --git a/ocpm/models.py b/ocpm/models.py index 9c7dc99..c5e659e 100644 --- a/ocpm/models.py +++ b/ocpm/models.py @@ -11,14 +11,13 @@ class Kext: path: Path name: str - id: str version: str - sdk_os: str - min_os: str - - def __init__(self, path: Path): - self.path = path + id: str | None = None + sdk_os: str | None = None + min_os: str | None = None + @classmethod + def from_path(cls, path: Path) -> 'Kext': # Find plist file plist = path / 'Contents' / 'Info.plist' if not plist.is_file(): @@ -27,14 +26,16 @@ class Kext: # Load plist file plist = plistlib.loads(plist.read_bytes()) - self.name = plist['CFBundleName'] - self.id = plist['CFBundleIdentifier'] - self.version = plist['CFBundleVersion'] - self.sdk_os = plist.get('DTSDKName') - self.min_os = plist.get('LSMinimumSystemVersion') + name = plist['CFBundleName'] + id = plist['CFBundleIdentifier'] + version = plist['CFBundleVersion'] + sdk_os = plist.get('DTSDKName') + min_os = plist.get('LSMinimumSystemVersion') - if self.sdk_os: - self.sdk_os = self.sdk_os.replace('macosx', '') + if sdk_os: + sdk_os = sdk_os.replace('macosx', '') + + return cls(path, name, id, version, sdk_os, min_os) @dataclass()