diff --git a/README.md b/README.md index 115bcdf..f785b3a 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,14 @@ Retry only repositories that failed during the previous non-dry-run sync: git-sync sync --retry-failed ``` +Control repo-level parallelism: + +```sh +git-sync sync --jobs 8 +``` + +Repository logs are prefixed while jobs run concurrently, so interleaved output remains attributable. The default is 4 workers; use `--jobs 1` for serial sync. + `git-sync` stores a small ref cache in the work directory. On later runs it first checks each repository with `git ls-remote --heads --tags`; when all endpoints report the same refs as the last successful sync, it skips the full fetch/push pass for that repository. Use cron or another scheduler for automatic execution: diff --git a/src/git.rs b/src/git.rs index 21739b5..ce0f0cc 100644 --- a/src/git.rs +++ b/src/git.rs @@ -64,7 +64,7 @@ impl GitMirror { pub fn open(path: PathBuf, redactor: Redactor, dry_run: bool) -> Result { if !path.exists() { if dry_run { - println!( + crate::logln!( " {} git init --bare {}", style("dry-run").yellow().bold(), style(path.display()).dim() @@ -100,7 +100,7 @@ impl GitMirror { pub fn fetch_remote(&self, remote: &RemoteSpec) -> Result<()> { let branch_refspec = format!("+refs/heads/*:refs/remotes/{}/*", remote.name); let tag_refspec = format!("+refs/tags/*:refs/remote-tags/{}/*", remote.name); - println!( + crate::logln!( " {} {}", style("fetch").cyan().bold(), style(&remote.display).dim() @@ -248,7 +248,7 @@ impl GitMirror { } else { format!("{}:refs/heads/{}", branch.sha, branch.branch) }; - println!( + crate::logln!( " {} {} {} {}", style("push").green().bold(), style("branch").dim(), @@ -268,7 +268,7 @@ impl GitMirror { continue; } let refspec = format!("{}:refs/tags/{}", tag.sha, tag.tag); - println!( + crate::logln!( " {} {} {} {}", style("push").green().bold(), style("tag").dim(), @@ -553,7 +553,7 @@ where .map(|arg| arg.as_ref().to_string()) .collect::>(); if dry_run { - println!( + crate::logln!( " {} {} {}", style("dry-run").yellow().bold(), program, diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..21ba5dd --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,76 @@ +use std::cell::RefCell; +use std::fmt; +use std::sync::{Mutex, OnceLock}; + +use console::style; + +static OUTPUT_LOCK: OnceLock> = OnceLock::new(); + +thread_local! { + static PREFIX: RefCell> = const { RefCell::new(None) }; +} + +pub struct PrefixGuard { + previous: Option, +} + +impl Drop for PrefixGuard { + fn drop(&mut self) { + PREFIX.with(|prefix| { + *prefix.borrow_mut() = self.previous.take(); + }); + } +} + +pub fn set_prefix(prefix: String) -> PrefixGuard { + let previous = PREFIX.with(|current| current.borrow_mut().replace(prefix)); + PrefixGuard { previous } +} + +pub fn repo_prefix(repo_name: &str, width: usize) -> String { + let mut prefix = repo_name.chars().take(width).collect::(); + if repo_name.chars().count() > width && width > 0 { + prefix.pop(); + prefix.push('~'); + } + format!("{prefix:) { + let text = args.to_string(); + let prefix = PREFIX.with(|prefix| prefix.borrow().clone()); + let lock = OUTPUT_LOCK.get_or_init(|| Mutex::new(())); + let _guard = lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner()); + + match prefix { + Some(prefix) if !text.is_empty() => { + for line in text.lines() { + println!("{} | {}", style(&prefix).cyan().bold(), line); + } + } + _ => { + println!("{text}"); + } + } +} + +#[macro_export] +macro_rules! logln { + () => { + $crate::logging::line(format_args!("")) + }; + ($($arg:tt)*) => { + $crate::logging::line(format_args!($($arg)*)) + }; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn repo_prefix_pads_and_truncates_to_fixed_width() { + assert_eq!(repo_prefix("api", 6), "api "); + assert_eq!(repo_prefix("very-long-repo", 8), "very-lo~"); + } +} diff --git a/src/main.rs b/src/main.rs index 9f3f2a1..f27db43 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod config; mod git; mod interactive; +mod logging; mod provider; mod sync; @@ -13,7 +14,7 @@ use crate::config::{ Config, EndpointConfig, NamespaceKind, ProviderKind, SiteConfig, TokenConfig, Visibility, default_config_path, }; -use crate::sync::{SyncOptions, sync_all}; +use crate::sync::{DEFAULT_JOBS, SyncOptions, sync_all}; #[derive(Parser, Debug)] #[command(name = "git-sync")] @@ -111,6 +112,8 @@ struct SyncCommand { retry_failed: bool, #[arg(long, value_name = "PATH")] work_dir: Option, + #[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")] + jobs: usize, } #[derive(Clone, Debug, ValueEnum)] @@ -154,6 +157,7 @@ fn main() -> Result<()> { repo_pattern: command.repo_pattern, retry_failed: command.retry_failed, work_dir: command.work_dir, + jobs: command.jobs, }, ) } @@ -383,6 +387,16 @@ mod tests { assert!(args.retry_failed); } + #[test] + fn cli_accepts_sync_jobs() { + let cli = Cli::try_parse_from(["git-sync", "sync", "--jobs", "8"]).unwrap(); + + let Command::Sync(args) = cli.command else { + panic!("parsed unexpected command"); + }; + assert_eq!(args.jobs, 8); + } + #[test] fn endpoint_parser_supports_aliases_and_rejects_bad_kinds() { let endpoint = parse_endpoint("github:organization:MewoLab").unwrap(); diff --git a/src/sync.rs b/src/sync.rs index 0cd635a..00dd143 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,6 +1,8 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::fs; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; use anyhow::{Context, Result, bail}; use console::style; @@ -12,12 +14,14 @@ use crate::git::{ GitMirror, Redactor, RemoteRefSnapshot, RemoteSpec, is_disabled_repository_error, ls_remote_refs, safe_remote_name, }; +use crate::logging; use crate::provider::{EndpointRepo, ProviderClient, repos_by_name}; const FAILURE_STATE_FILE: &str = "failed-repos.toml"; const REF_STATE_FILE: &str = "ref-state.toml"; +pub const DEFAULT_JOBS: usize = 4; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct SyncOptions { pub group: Option, pub dry_run: bool, @@ -26,10 +30,29 @@ pub struct SyncOptions { pub repo_pattern: Option, pub retry_failed: bool, pub work_dir: Option, + pub jobs: usize, +} + +impl Default for SyncOptions { + fn default() -> Self { + Self { + group: None, + dry_run: false, + create_missing_override: None, + force_override: None, + repo_pattern: None, + retry_failed: false, + work_dir: None, + jobs: DEFAULT_JOBS, + } + } } pub fn sync_all(config: &Config, options: SyncOptions) -> Result<()> { validate_config(config)?; + if options.jobs == 0 { + bail!("--jobs must be at least 1"); + } let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir); fs::create_dir_all(&work_dir) .with_context(|| format!("failed to create {}", work_dir.display()))?; @@ -256,7 +279,7 @@ fn ref_state_path(work_dir: &Path) -> PathBuf { } fn print_failure(scope: &str, error: &anyhow::Error) { - println!( + crate::logln!( " {} {} {}", style("fail").red().bold(), style(scope).cyan(), @@ -265,16 +288,16 @@ fn print_failure(scope: &str, error: &anyhow::Error) { } fn print_failure_summary(failures: &[SyncFailure]) { - println!(); - println!( + crate::logln!(); + crate::logln!( "{} {}", style("Failures").red().bold(), style(format!("({})", failures.len())).dim() ); for (index, failure) in failures.iter().enumerate() { - println!(" {}. {}", index + 1, style(&failure.scope).cyan().bold()); + crate::logln!(" {}. {}", index + 1, style(&failure.scope).cyan().bold()); for line in failure.error.lines() { - println!(" {line}"); + crate::logln!(" {line}"); } } } @@ -305,8 +328,8 @@ fn sync_group( context: &mut GroupSyncContext<'_>, mirror: &MirrorConfig, ) -> Result> { - println!(); - println!( + crate::logln!(); + crate::logln!( "{} {}", style("Mirror group").cyan().bold(), style(&mirror.name).bold() @@ -321,7 +344,7 @@ fn sync_group( for endpoint in &mirror.endpoints { let site = context.config.site(&endpoint.site).unwrap(); let client = ProviderClient::new(site)?; - println!( + crate::logln!( " {} {}", style("list").cyan().bold(), style(endpoint.label()).dim() @@ -354,25 +377,25 @@ fn sync_group( .collect::>(); if repo_names.is_empty() { if let Some(retry_repo_names) = retry_repo_names { - println!( + crate::logln!( " {} no previously failed repositories were found in this group ({} saved)", style("skip").yellow().bold(), retry_repo_names.len() ); } else if context.retry_failed_repos.is_some() { - println!( + crate::logln!( " {} no previous failures for this group", style("skip").yellow().bold() ); } else if let Some(pattern) = context.repo_pattern { - println!( + crate::logln!( " {} no repositories match {} ({} discovered)", style("skip").yellow().bold(), style(pattern.as_str()).cyan(), all_repo_count ); } else { - println!( + crate::logln!( " {} mirror group has no repositories", style("skip").yellow().bold() ); @@ -380,7 +403,7 @@ fn sync_group( return Ok(Vec::new()); } if let Some(pattern) = context.repo_pattern { - println!( + crate::logln!( " {} {} of {} repositories match {}", style("filter").cyan().bold(), repo_names.len(), @@ -389,7 +412,7 @@ fn sync_group( ); } if let Some(retry_repo_names) = retry_repo_names { - println!( + crate::logln!( " {} retrying {} of {} previously failed repositories", style("retry").cyan().bold(), repo_names.len(), @@ -397,35 +420,133 @@ fn sync_group( ); } - let mut failures = Vec::new(); - for repo_name in repo_names { - let mut existing = repos.remove(&repo_name).unwrap_or_default(); - let repo_context = RepoSyncContext { - config: context.config, - mirror, - work_dir: context.work_dir, - redactor: context.redactor.clone(), - dry_run: context.options.dry_run, - allow_force, - }; - if let Err(error) = sync_repo( - &repo_context, - &repo_name, - &mut existing, - create_missing, - &mut *context.ref_state, - ) - .with_context(|| format!("failed to sync repo {repo_name}")) - { - let scope = format!("{}/{}", mirror.name, repo_name); - print_failure(&scope, &error); - failures.push(SyncFailure::repo(mirror.name.clone(), repo_name, error)); - } + let repo_log_width = repo_log_width(&repo_names); + let repo_jobs = repo_names + .into_iter() + .map(|repo_name| { + let existing = repos.remove(&repo_name).unwrap_or_default(); + RepoSyncJob { + repo_name, + existing, + } + }) + .collect::>(); + let worker_count = context.options.jobs.min(repo_jobs.len()).max(1); + if worker_count > 1 { + crate::logln!( + " {} syncing repositories with {} workers", + style("jobs").cyan().bold(), + worker_count + ); } + let base_ref_state = context.ref_state.clone(); + let queue = Arc::new(Mutex::new(repo_jobs)); + let (sender, receiver) = mpsc::channel(); + let failures = thread::scope(|scope| { + for _ in 0..worker_count { + let queue = Arc::clone(&queue); + let sender = sender.clone(); + let redactor = context.redactor.clone(); + let config = context.config; + let work_dir = context.work_dir; + let dry_run = context.options.dry_run; + let ref_state = &base_ref_state; + + scope.spawn(move || { + while let Some(mut job) = pop_repo_job(&queue) { + let prefix = logging::repo_prefix(&job.repo_name, repo_log_width); + let _prefix_guard = logging::set_prefix(prefix); + let repo_context = RepoSyncContext { + config, + mirror, + work_dir, + redactor: redactor.clone(), + dry_run, + allow_force, + }; + let result = sync_repo( + &repo_context, + &job.repo_name, + &mut job.existing, + create_missing, + ref_state, + ) + .with_context(|| format!("failed to sync repo {}", job.repo_name)) + .map(|outcome| RepoWorkerSuccess { + repo_name: job.repo_name.clone(), + outcome, + }) + .map_err(|error| RepoWorkerFailure { + repo_name: job.repo_name, + error, + }); + if sender.send(result).is_err() { + break; + } + } + }); + } + drop(sender); + + let mut failures = Vec::new(); + for result in receiver { + match result { + Ok(success) => { + if let Some(refs) = success.outcome.ref_update { + context + .ref_state + .set_repo(&mirror.name, &success.repo_name, refs); + } + } + Err(failure) => { + let scope = format!("{}/{}", mirror.name, failure.repo_name); + print_failure(&scope, &failure.error); + failures.push(SyncFailure::repo( + mirror.name.clone(), + failure.repo_name, + failure.error, + )); + } + } + } + failures + }); + Ok(failures) } +fn pop_repo_job(queue: &Arc>>) -> Option { + queue + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .pop_front() +} + +fn repo_log_width(repo_names: &BTreeSet) -> usize { + repo_names + .iter() + .map(|name| name.chars().count()) + .max() + .unwrap_or(4) + .clamp(4, 32) +} + +struct RepoSyncJob { + repo_name: String, + existing: Vec, +} + +struct RepoWorkerSuccess { + repo_name: String, + outcome: RepoSyncOutcome, +} + +struct RepoWorkerFailure { + repo_name: String, + error: anyhow::Error, +} + fn ensure_missing_repos( config: &Config, mirror: &MirrorConfig, @@ -445,7 +566,7 @@ fn ensure_missing_repos( continue; } if !create_missing { - println!( + crate::logln!( " {} {} missing on {} ({})", style("skip").yellow().bold(), style(repo_name).cyan(), @@ -455,7 +576,7 @@ fn ensure_missing_repos( continue; } - println!( + crate::logln!( " {} {} {}", style("create").green().bold(), style(repo_name).cyan(), @@ -478,7 +599,7 @@ fn ensure_missing_repos( ) .with_context(|| format!("failed to create {} on {}", repo_name, endpoint.label()))?; if created.private != matches!(mirror.visibility, crate::config::Visibility::Private) { - println!( + crate::logln!( " {} created {} on {}, but provider reported a different visibility than requested", style("warn").yellow().bold(), style(repo_name).cyan(), @@ -503,41 +624,46 @@ struct RepoSyncContext<'a> { allow_force: bool, } +#[derive(Default)] +struct RepoSyncOutcome { + ref_update: Option>, +} + fn sync_repo( context: &RepoSyncContext<'_>, repo_name: &str, repos: &mut Vec, create_missing: bool, - ref_state: &mut RefState, -) -> Result<()> { - println!(); - println!( + ref_state: &RefState, +) -> Result { + crate::logln!(); + crate::logln!( "{} {}", style("Repo").magenta().bold(), style(repo_name).bold() ); if repos.is_empty() { - println!( + crate::logln!( " {} {}", style("skip").yellow().bold(), style("repository not found on any endpoint").dim() ); - return Ok(()); + return Ok(RepoSyncOutcome::default()); } let initial_remotes = remote_specs(context, repos)?; let Some(initial_ref_state) = check_remote_refs(context, repo_name, &initial_remotes)? else { - return Ok(()); + return Ok(RepoSyncOutcome::default()); }; if !context.dry_run && all_configured_endpoints_present(context.mirror, repos) && ref_state.repo_matches(&context.mirror.name, repo_name, &initial_ref_state) { - println!( + crate::logln!( " {} refs unchanged since last successful sync", style("up-to-date").green().bold() ); - return Ok(()); + return Ok(RepoSyncOutcome::default()); } let path = context @@ -550,13 +676,13 @@ fn sync_repo( for remote in &initial_remotes { if let Err(error) = mirror_repo.fetch_remote(remote) { if is_disabled_repository_error(&error) { - println!( + crate::logln!( " {} {} {}", style("skip").yellow().bold(), style(repo_name).cyan(), style(format!("provider blocked access on {}", remote.display)).dim() ); - return Ok(()); + return Ok(RepoSyncOutcome::default()); } return Err(error).with_context(|| format!("failed to fetch {}", remote.display)); } @@ -572,13 +698,13 @@ fn sync_repo( )?; if repos.len() < 2 { - println!( + crate::logln!( " {} {} {}", style("skip").yellow().bold(), style(repo_name).cyan(), style("fewer than two endpoints have this repository").dim() ); - return Ok(()); + return Ok(RepoSyncOutcome::default()); } let remotes = remote_specs(context, repos)?; @@ -593,13 +719,13 @@ fn sync_repo( { if let Err(error) = mirror_repo.fetch_remote(remote) { if is_disabled_repository_error(&error) { - println!( + crate::logln!( " {} {} {}", style("skip").yellow().bold(), style(repo_name).cyan(), style(format!("provider blocked access on {}", remote.display)).dim() ); - return Ok(()); + return Ok(RepoSyncOutcome::default()); } return Err(error).with_context(|| format!("failed to fetch {}", remote.display)); } @@ -609,15 +735,17 @@ fn sync_repo( if !context.dry_run && !result.had_conflicts { let refs = if result.pushed { let Some(refs) = check_remote_refs(context, repo_name, &remotes)? else { - return Ok(()); + return Ok(RepoSyncOutcome::default()); }; refs } else { initial_ref_state }; - ref_state.set_repo(&context.mirror.name, repo_name, refs); + return Ok(RepoSyncOutcome { + ref_update: Some(refs), + }); } - Ok(()) + Ok(RepoSyncOutcome::default()) } fn all_configured_endpoints_present(mirror: &MirrorConfig, repos: &[EndpointRepo]) -> bool { @@ -638,7 +766,7 @@ fn check_remote_refs( ) -> Result>> { let mut refs = BTreeMap::new(); for remote in remotes { - println!( + crate::logln!( " {} {}", style("check refs").cyan().bold(), style(&remote.display).dim() @@ -646,7 +774,7 @@ fn check_remote_refs( let snapshot = match ls_remote_refs(remote, &context.redactor) { Ok(snapshot) => snapshot, Err(error) if is_disabled_repository_error(&error) => { - println!( + crate::logln!( " {} {} {}", style("skip").yellow().bold(), style(repo_name).cyan(), @@ -711,7 +839,7 @@ fn push_repo_refs( .map(|(remote, sha)| format!("{remote}@{}", short_sha(sha))) .collect::>() .join(", "); - println!( + crate::logln!( " {} branch {} diverged across {} ({})", style("conflict").yellow().bold(), style(conflict.branch).cyan(), @@ -733,7 +861,7 @@ fn push_repo_refs( .map(|(remote, sha)| format!("{remote}@{}", short_sha(sha))) .collect::>() .join(", "); - println!( + crate::logln!( " {} tag {} differs across {} ({})", style("conflict").yellow().bold(), style(conflict.tag).cyan(), @@ -743,7 +871,7 @@ fn push_repo_refs( } if branches_to_push.is_empty() && tags_to_push.is_empty() { - println!( + crate::logln!( " {} branches and tags already match all endpoints", style("up-to-date").green().bold() ); @@ -772,13 +900,13 @@ struct RepoRefSyncResult { } fn print_branch_decisions(branches: &[crate::git::BranchDecision]) { - println!( + crate::logln!( " {} {}", style("branches").cyan().bold(), style(format!("({})", branches.len())).dim() ); for branch in branches { - println!( + crate::logln!( " {} {} {}", style(&branch.branch).cyan(), style(format!("@{}", short_sha(&branch.sha))).dim(), @@ -793,13 +921,13 @@ fn print_branch_decisions(branches: &[crate::git::BranchDecision]) { } fn print_tag_decisions(tags: &[crate::git::TagDecision]) { - println!( + crate::logln!( " {} {}", style("tags").cyan().bold(), style(format!("({})", tags.len())).dim() ); for tag in tags { - println!( + crate::logln!( " {} {} {}", style(&tag.tag).cyan(), style(format!("@{}", short_sha(&tag.sha))).dim(),