[+] Install feature

This commit is contained in:
Hykilpikonna
2022-09-03 01:36:25 -04:00
parent f387de26dc
commit a6cef43f90
3 changed files with 126 additions and 91 deletions
+5 -2
View File
@@ -12,12 +12,15 @@ pip install ocpm
## Usage ## Usage
First, cd to your EFI directory, and then run `ocpm update` to update all kexts. First, cd to your EFI directory, and then run `ocpm -U` to update all kexts.
Use `ocpm -I [kext names...]` to install kexts
## Features / To-do ## Features / To-do
* [x] Update kexts to the latest version * [x] Update kexts to the latest version
* [ ] Install/uninstall kexts * [x] Install kexts
* [ ] Uninstall kexts
* [ ] Resolve dependencies * [ ] Resolve dependencies
* [ ] Install specific versions of a kext * [ ] Install specific versions of a kext
* [ ] Install OS-dependent kexts (like AirportItlwm) * [ ] Install OS-dependent kexts (like AirportItlwm)
+107 -76
View File
@@ -13,7 +13,7 @@ from zipfile import ZipFile
import requests import requests
import ruamel.yaml import ruamel.yaml
import tqdm as tqdm import tqdm as tqdm
from hypy_utils import printc from hypy_utils import printc, ensure_dir
from packaging import version from packaging import version
from tqdm.contrib.concurrent import thread_map from tqdm.contrib.concurrent import thread_map
@@ -28,13 +28,13 @@ except Exception:
bar_len = 20 bar_len = 20
def get_latest_release(kext: Kext, repos: dict, pre: bool): def get_latest_release(name: str, repos: dict, pre: bool):
# Lowercase keys # Lowercase keys
repos = {k.lower(): v for k, v in repos['Repos'].items()} repos = {k.lower(): v for k, v in repos['Repos'].items()}
name = kext.name.lower() name = name.lower()
# Find repo # Find repo
assert name in repos, f'Kext {kext.name} is not found in our repos. (If it\'s open source, feel free to add it in!)' assert name in repos, f'Kext {name} is not found in our repos. (If it\'s open source, feel free to add it in!)'
repo_info = repos[name] repo_info = repos[name]
if isinstance(repo_info, str): if isinstance(repo_info, str):
@@ -43,7 +43,7 @@ def get_latest_release(kext: Kext, repos: dict, pre: bool):
repo = repo_info['Repo'] repo = repo_info['Repo']
artifact = repo_info.get('Artifact') artifact = repo_info.get('Artifact')
assert 'github.com/' in repo, f'For {kext.name}: {repo} is not a github repo, skipping...' assert 'github.com/' in repo, f'For {name}: {repo} is not a github repo, skipping...'
repo = repo.split('github.com/')[1] repo = repo.split('github.com/')[1]
# Check latest version # Check latest version
@@ -81,16 +81,40 @@ def download_file(url: str, file: str | Path):
return file return file
def extract_kext(kext_name: str, zip_file: Path, tmp_dir: Path) -> Path | None:
if zip_file.suffix != '.zip':
print(f'Unable to process {zip_file.name}. Currently only zip files are supported.')
return None
with ZipFile(zip_file, 'r') as zipf:
lower = kext_name.lower() + '.kext/'
def find_name():
for n in zipf.namelist():
if n.lower().endswith(lower):
return n
return None
name = find_name()
if not name:
print(f'Unable to find {kext_name}.kext in {zip_file.name}, skipping.')
return None
for to_extract in zipf.namelist():
if to_extract.startswith(name):
zipf.extract(to_extract, tmp_dir)
return tmp_dir / name
def download_updates(efi: Path, updates: list[tuple[Kext, Release]]): def download_updates(efi: Path, updates: list[tuple[Kext, Release]]):
# Create temporary directory # Create temporary directory
with TemporaryDirectory() as tmp: with TemporaryDirectory() as tmp:
start = time.time_ns() start = time.time_ns()
tmp = Path(tmp) tmp = Path(tmp)
kexts = tmp / 'extract' kexts = ensure_dir(tmp / 'extract')
kexts.mkdir(parents=True, exist_ok=True)
backup = efi.parent / f'Backups/{datetime.now().strftime("%m-%d %H-%M")}'
backup.mkdir(parents=True, exist_ok=True)
print('Downloading zip files...') print('Downloading zip files...')
files = [(k, r, download_file(r.artifact.url, tmp / r.artifact.name)) for k, r in updates] files = [(k, r, download_file(r.artifact.url, tmp / r.artifact.name)) for k, r in updates]
@@ -98,38 +122,16 @@ def download_updates(efi: Path, updates: list[tuple[Kext, Release]]):
print() print()
print('Extracting kexts...') print('Extracting kexts...')
def extract(k: Kext, f: Path): extracted = [(k, r, extract_kext(k.name, f, kexts)) for k, r, f in files]
if f.suffix != '.zip':
print(f'Unable to process {f.name}. Currently only zip files are supported.')
return None
with ZipFile(f, 'r') as zipf:
lower = k.name.lower() + '.kext/'
def find_name():
for n in zipf.namelist():
if n.lower().endswith(lower):
return n
return None
name = find_name()
if not find_name():
print(f'Unable to find {k.name}.kext in {f.name}, skipping.')
return None
for to_extract in zipf.namelist():
if to_extract.startswith(name):
zipf.extract(to_extract, kexts)
return kexts / name
extracted = [(k, r, extract(k, f)) for k, r, f in files]
extracted = [e for e in extracted if e[2]] extracted = [e for e in extracted if e[2]]
print(f'Backing up original kexts to {backup}...') # Backup if needed
for k, r, e in extracted: existing = [t for t in extracted if t[0].path.is_dir()]
shutil.move(k.path, backup / k.name) if len(existing) > 0:
backup = ensure_dir(efi.parent / f'Backups/{datetime.now().strftime("%m-%d %H-%M")}')
print(f'Backing up original kexts to {backup}...')
for k, r, e in existing:
shutil.move(k.path, backup / k.name)
print(f'Installing new kexts...') print(f'Installing new kexts...')
for k, r, e in extracted: for k, r, e in extracted:
@@ -139,48 +141,19 @@ def download_updates(efi: Path, updates: list[tuple[Kext, Release]]):
print(f'✨ All Done in {(time.time_ns() - start) / 1e6:,.0f}s!') print(f'✨ All Done in {(time.time_ns() - start) / 1e6:,.0f}s!')
def run(): def get_latest_list(names: list[str], repos: dict, args) -> list[Release]:
parser = argparse.ArgumentParser(description='OpenCore Package Manager by HyDEV') def get_latest(s: str):
parser.add_argument('cmd', help='Command (update)')
parser.add_argument('--efi', help='EFI Directory Path', default='.')
parser.add_argument('--pre', action='store_true', help='Use pre-release')
parser.add_argument('-y', action='store_true', help='Say yes')
printc('\n&fOpenCore Package Manager v1.0.0 by HyDEV\n')
args = parser.parse_args()
if args.cmd.lower() != 'update':
print(f'Unknown Command: {args.cmd}')
return
# Normalize EFI Path
efi = Path(args.efi)
if (efi / 'EFI').is_dir():
efi = efi / 'EFI'
assert (efi / 'OC').is_dir(), 'Open Core directory (OC) not found.'
# Find kexts
kexts_dir = efi / 'OC' / 'Kexts'
kexts = [str(f) for f in os.listdir(kexts_dir)]
kexts = [kexts_dir / f for f in kexts if f.lower().endswith('.kext')]
kexts = [Kext(k) for k in kexts]
print(f'🔍 Found {len(kexts)} kexts in {kexts_dir}')
# print_kexts(kexts)
# Read Repo
with open(Path(__file__).parent / 'data' / 'OCKextRepos.yml') as f:
repos = ruamel.yaml.safe_load(f)
# Get latest repos with multithreading
def get_latest(k: Kext):
try: try:
return get_latest_release(k, repos, args.pre) return get_latest_release(s, repos, args.pre)
except AssertionError as e: except AssertionError as e:
print(e) print(e)
return None return None
latests = thread_map(get_latest, kexts, desc='Fetching Updates'.ljust(bar_len), max_workers=32, bar_format='{desc} {rate_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #', unit=' pkg') return thread_map(get_latest, names, desc='Fetching Kexts'.ljust(bar_len), max_workers=32, bar_format='{desc} {rate_fmt} {remaining} [{bar}] {percentage:.0f}%', ascii=' #', unit=' pkg')
def update(args, repos: dict, kexts: list[Kext], efi: Path):
latests = get_latest_list([k.name for k in kexts], repos, args)
# Compare versions # Compare versions
updates: list[tuple[Kext, Release]] updates: list[tuple[Kext, Release]]
@@ -208,5 +181,63 @@ def run():
download_updates(efi, updates) download_updates(efi, updates)
def install(args, repos: dict, kexts: list[Kext], efi: Path):
names = args.install
latests = get_latest_list(names, repos, args)
# Compare versions
updates: list[tuple[Kext, Release]]
updates = [(Kext(efi / 'OC' / 'Kexts' / n, n, version=l.tag), l) for n, l in zip(names, latests) if l]
# Download prompt
print()
print(f'Total download size: {sizeof_fmt(sum(l.artifact.size for k, l in updates))}')
proceed = 'y' if args.y else input(f'🚀 Ready to fly? [y/N] ')
if proceed.lower().strip() != 'y':
print()
print('😕 Huh, okay')
exit(0)
print()
download_updates(efi, updates)
def run():
parser = argparse.ArgumentParser(description='OpenCore Package Manager by HyDEV')
parser.add_argument('-U', '--update', action='store_true', help='Update')
parser.add_argument('-S', '--install', nargs='+', help='Install packages')
parser.add_argument('--efi', help='EFI Directory Path', default='.')
parser.add_argument('--pre', action='store_true', help='Use pre-release')
parser.add_argument('-y', action='store_true', help='Say yes')
printc('\n&fOpenCore Package Manager v1.0.0 by HyDEV\n')
args = parser.parse_args()
# Normalize EFI Path
efi = Path(args.efi)
if (efi / 'EFI').is_dir():
efi = efi / 'EFI'
assert (efi / 'OC').is_dir(), 'Open Core directory (OC) not found.'
# Find kexts
kexts_dir = efi / 'OC' / 'Kexts'
kexts = [str(f) for f in os.listdir(kexts_dir)]
kexts = [kexts_dir / f for f in kexts if f.lower().endswith('.kext')]
kexts = [Kext.from_path(k) for k in kexts]
print(f'🔍 Found {len(kexts)} kexts in {kexts_dir}')
# Read Repo
with open(Path(__file__).parent / 'data' / 'OCKextRepos.yml') as f:
repos = ruamel.yaml.safe_load(f)
if args.update:
return update(args, repos, kexts, efi)
if args.install:
return install(args, repos, kexts, efi)
if __name__ == '__main__': if __name__ == '__main__':
run() run()
+14 -13
View File
@@ -11,14 +11,13 @@ class Kext:
path: Path path: Path
name: str name: str
id: str
version: str version: str
sdk_os: str id: str | None = None
min_os: str sdk_os: str | None = None
min_os: str | None = None
def __init__(self, path: Path):
self.path = path
@classmethod
def from_path(cls, path: Path) -> 'Kext':
# Find plist file # Find plist file
plist = path / 'Contents' / 'Info.plist' plist = path / 'Contents' / 'Info.plist'
if not plist.is_file(): if not plist.is_file():
@@ -27,14 +26,16 @@ class Kext:
# Load plist file # Load plist file
plist = plistlib.loads(plist.read_bytes()) plist = plistlib.loads(plist.read_bytes())
self.name = plist['CFBundleName'] name = plist['CFBundleName']
self.id = plist['CFBundleIdentifier'] id = plist['CFBundleIdentifier']
self.version = plist['CFBundleVersion'] version = plist['CFBundleVersion']
self.sdk_os = plist.get('DTSDKName') sdk_os = plist.get('DTSDKName')
self.min_os = plist.get('LSMinimumSystemVersion') min_os = plist.get('LSMinimumSystemVersion')
if self.sdk_os: if sdk_os:
self.sdk_os = self.sdk_os.replace('macosx', '') sdk_os = sdk_os.replace('macosx', '')
return cls(path, name, id, version, sdk_os, min_os)
@dataclass() @dataclass()