[+] Multithreading

This commit is contained in:
2026-05-06 20:05:59 +00:00
parent 61450e3a97
commit bb991d94f0
5 changed files with 302 additions and 76 deletions
+8
View File
@@ -126,6 +126,14 @@ Retry only repositories that failed during the previous non-dry-run sync:
git-sync sync --retry-failed
```
Control repo-level parallelism:
```sh
git-sync sync --jobs 8
```
Repository logs are prefixed while jobs run concurrently, so interleaved output remains attributable. The default is 4 workers; use `--jobs 1` for serial sync.
`git-sync` stores a small ref cache in the work directory. On later runs it first checks each repository with `git ls-remote --heads --tags`; when all endpoints report the same refs as the last successful sync, it skips the full fetch/push pass for that repository.
Use cron or another scheduler for automatic execution:
+5 -5
View File
@@ -64,7 +64,7 @@ impl GitMirror {
pub fn open(path: PathBuf, redactor: Redactor, dry_run: bool) -> Result<Self> {
if !path.exists() {
if dry_run {
println!(
crate::logln!(
" {} git init --bare {}",
style("dry-run").yellow().bold(),
style(path.display()).dim()
@@ -100,7 +100,7 @@ impl GitMirror {
pub fn fetch_remote(&self, remote: &RemoteSpec) -> Result<()> {
let branch_refspec = format!("+refs/heads/*:refs/remotes/{}/*", remote.name);
let tag_refspec = format!("+refs/tags/*:refs/remote-tags/{}/*", remote.name);
println!(
crate::logln!(
" {} {}",
style("fetch").cyan().bold(),
style(&remote.display).dim()
@@ -248,7 +248,7 @@ impl GitMirror {
} else {
format!("{}:refs/heads/{}", branch.sha, branch.branch)
};
println!(
crate::logln!(
" {} {} {} {}",
style("push").green().bold(),
style("branch").dim(),
@@ -268,7 +268,7 @@ impl GitMirror {
continue;
}
let refspec = format!("{}:refs/tags/{}", tag.sha, tag.tag);
println!(
crate::logln!(
" {} {} {} {}",
style("push").green().bold(),
style("tag").dim(),
@@ -553,7 +553,7 @@ where
.map(|arg| arg.as_ref().to_string())
.collect::<Vec<_>>();
if dry_run {
println!(
crate::logln!(
" {} {} {}",
style("dry-run").yellow().bold(),
program,
+76
View File
@@ -0,0 +1,76 @@
use std::cell::RefCell;
use std::fmt;
use std::sync::{Mutex, OnceLock};
use console::style;
static OUTPUT_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
thread_local! {
static PREFIX: RefCell<Option<String>> = const { RefCell::new(None) };
}
pub struct PrefixGuard {
previous: Option<String>,
}
impl Drop for PrefixGuard {
fn drop(&mut self) {
PREFIX.with(|prefix| {
*prefix.borrow_mut() = self.previous.take();
});
}
}
pub fn set_prefix(prefix: String) -> PrefixGuard {
let previous = PREFIX.with(|current| current.borrow_mut().replace(prefix));
PrefixGuard { previous }
}
pub fn repo_prefix(repo_name: &str, width: usize) -> String {
let mut prefix = repo_name.chars().take(width).collect::<String>();
if repo_name.chars().count() > width && width > 0 {
prefix.pop();
prefix.push('~');
}
format!("{prefix:<width$}")
}
pub fn line(args: fmt::Arguments<'_>) {
let text = args.to_string();
let prefix = PREFIX.with(|prefix| prefix.borrow().clone());
let lock = OUTPUT_LOCK.get_or_init(|| Mutex::new(()));
let _guard = lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
match prefix {
Some(prefix) if !text.is_empty() => {
for line in text.lines() {
println!("{} | {}", style(&prefix).cyan().bold(), line);
}
}
_ => {
println!("{text}");
}
}
}
#[macro_export]
macro_rules! logln {
() => {
$crate::logging::line(format_args!(""))
};
($($arg:tt)*) => {
$crate::logging::line(format_args!($($arg)*))
};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn repo_prefix_pads_and_truncates_to_fixed_width() {
assert_eq!(repo_prefix("api", 6), "api ");
assert_eq!(repo_prefix("very-long-repo", 8), "very-lo~");
}
}
+15 -1
View File
@@ -1,6 +1,7 @@
mod config;
mod git;
mod interactive;
mod logging;
mod provider;
mod sync;
@@ -13,7 +14,7 @@ use crate::config::{
Config, EndpointConfig, NamespaceKind, ProviderKind, SiteConfig, TokenConfig, Visibility,
default_config_path,
};
use crate::sync::{SyncOptions, sync_all};
use crate::sync::{DEFAULT_JOBS, SyncOptions, sync_all};
#[derive(Parser, Debug)]
#[command(name = "git-sync")]
@@ -111,6 +112,8 @@ struct SyncCommand {
retry_failed: bool,
#[arg(long, value_name = "PATH")]
work_dir: Option<PathBuf>,
#[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")]
jobs: usize,
}
#[derive(Clone, Debug, ValueEnum)]
@@ -154,6 +157,7 @@ fn main() -> Result<()> {
repo_pattern: command.repo_pattern,
retry_failed: command.retry_failed,
work_dir: command.work_dir,
jobs: command.jobs,
},
)
}
@@ -383,6 +387,16 @@ mod tests {
assert!(args.retry_failed);
}
#[test]
fn cli_accepts_sync_jobs() {
let cli = Cli::try_parse_from(["git-sync", "sync", "--jobs", "8"]).unwrap();
let Command::Sync(args) = cli.command else {
panic!("parsed unexpected command");
};
assert_eq!(args.jobs, 8);
}
#[test]
fn endpoint_parser_supports_aliases_and_rejects_bad_kinds() {
let endpoint = parse_endpoint("github:organization:MewoLab").unwrap();
+198 -70
View File
@@ -1,6 +1,8 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use anyhow::{Context, Result, bail};
use console::style;
@@ -12,12 +14,14 @@ use crate::git::{
GitMirror, Redactor, RemoteRefSnapshot, RemoteSpec, is_disabled_repository_error,
ls_remote_refs, safe_remote_name,
};
use crate::logging;
use crate::provider::{EndpointRepo, ProviderClient, repos_by_name};
const FAILURE_STATE_FILE: &str = "failed-repos.toml";
const REF_STATE_FILE: &str = "ref-state.toml";
pub const DEFAULT_JOBS: usize = 4;
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct SyncOptions {
pub group: Option<String>,
pub dry_run: bool,
@@ -26,10 +30,29 @@ pub struct SyncOptions {
pub repo_pattern: Option<String>,
pub retry_failed: bool,
pub work_dir: Option<PathBuf>,
pub jobs: usize,
}
impl Default for SyncOptions {
fn default() -> Self {
Self {
group: None,
dry_run: false,
create_missing_override: None,
force_override: None,
repo_pattern: None,
retry_failed: false,
work_dir: None,
jobs: DEFAULT_JOBS,
}
}
}
pub fn sync_all(config: &Config, options: SyncOptions) -> Result<()> {
validate_config(config)?;
if options.jobs == 0 {
bail!("--jobs must be at least 1");
}
let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir);
fs::create_dir_all(&work_dir)
.with_context(|| format!("failed to create {}", work_dir.display()))?;
@@ -256,7 +279,7 @@ fn ref_state_path(work_dir: &Path) -> PathBuf {
}
fn print_failure(scope: &str, error: &anyhow::Error) {
println!(
crate::logln!(
" {} {} {}",
style("fail").red().bold(),
style(scope).cyan(),
@@ -265,16 +288,16 @@ fn print_failure(scope: &str, error: &anyhow::Error) {
}
fn print_failure_summary(failures: &[SyncFailure]) {
println!();
println!(
crate::logln!();
crate::logln!(
"{} {}",
style("Failures").red().bold(),
style(format!("({})", failures.len())).dim()
);
for (index, failure) in failures.iter().enumerate() {
println!(" {}. {}", index + 1, style(&failure.scope).cyan().bold());
crate::logln!(" {}. {}", index + 1, style(&failure.scope).cyan().bold());
for line in failure.error.lines() {
println!(" {line}");
crate::logln!(" {line}");
}
}
}
@@ -305,8 +328,8 @@ fn sync_group(
context: &mut GroupSyncContext<'_>,
mirror: &MirrorConfig,
) -> Result<Vec<SyncFailure>> {
println!();
println!(
crate::logln!();
crate::logln!(
"{} {}",
style("Mirror group").cyan().bold(),
style(&mirror.name).bold()
@@ -321,7 +344,7 @@ fn sync_group(
for endpoint in &mirror.endpoints {
let site = context.config.site(&endpoint.site).unwrap();
let client = ProviderClient::new(site)?;
println!(
crate::logln!(
" {} {}",
style("list").cyan().bold(),
style(endpoint.label()).dim()
@@ -354,25 +377,25 @@ fn sync_group(
.collect::<BTreeSet<_>>();
if repo_names.is_empty() {
if let Some(retry_repo_names) = retry_repo_names {
println!(
crate::logln!(
" {} no previously failed repositories were found in this group ({} saved)",
style("skip").yellow().bold(),
retry_repo_names.len()
);
} else if context.retry_failed_repos.is_some() {
println!(
crate::logln!(
" {} no previous failures for this group",
style("skip").yellow().bold()
);
} else if let Some(pattern) = context.repo_pattern {
println!(
crate::logln!(
" {} no repositories match {} ({} discovered)",
style("skip").yellow().bold(),
style(pattern.as_str()).cyan(),
all_repo_count
);
} else {
println!(
crate::logln!(
" {} mirror group has no repositories",
style("skip").yellow().bold()
);
@@ -380,7 +403,7 @@ fn sync_group(
return Ok(Vec::new());
}
if let Some(pattern) = context.repo_pattern {
println!(
crate::logln!(
" {} {} of {} repositories match {}",
style("filter").cyan().bold(),
repo_names.len(),
@@ -389,7 +412,7 @@ fn sync_group(
);
}
if let Some(retry_repo_names) = retry_repo_names {
println!(
crate::logln!(
" {} retrying {} of {} previously failed repositories",
style("retry").cyan().bold(),
repo_names.len(),
@@ -397,35 +420,133 @@ fn sync_group(
);
}
let mut failures = Vec::new();
for repo_name in repo_names {
let mut existing = repos.remove(&repo_name).unwrap_or_default();
let repo_context = RepoSyncContext {
config: context.config,
mirror,
work_dir: context.work_dir,
redactor: context.redactor.clone(),
dry_run: context.options.dry_run,
allow_force,
};
if let Err(error) = sync_repo(
&repo_context,
&repo_name,
&mut existing,
create_missing,
&mut *context.ref_state,
)
.with_context(|| format!("failed to sync repo {repo_name}"))
{
let scope = format!("{}/{}", mirror.name, repo_name);
print_failure(&scope, &error);
failures.push(SyncFailure::repo(mirror.name.clone(), repo_name, error));
}
let repo_log_width = repo_log_width(&repo_names);
let repo_jobs = repo_names
.into_iter()
.map(|repo_name| {
let existing = repos.remove(&repo_name).unwrap_or_default();
RepoSyncJob {
repo_name,
existing,
}
})
.collect::<VecDeque<_>>();
let worker_count = context.options.jobs.min(repo_jobs.len()).max(1);
if worker_count > 1 {
crate::logln!(
" {} syncing repositories with {} workers",
style("jobs").cyan().bold(),
worker_count
);
}
let base_ref_state = context.ref_state.clone();
let queue = Arc::new(Mutex::new(repo_jobs));
let (sender, receiver) = mpsc::channel();
let failures = thread::scope(|scope| {
for _ in 0..worker_count {
let queue = Arc::clone(&queue);
let sender = sender.clone();
let redactor = context.redactor.clone();
let config = context.config;
let work_dir = context.work_dir;
let dry_run = context.options.dry_run;
let ref_state = &base_ref_state;
scope.spawn(move || {
while let Some(mut job) = pop_repo_job(&queue) {
let prefix = logging::repo_prefix(&job.repo_name, repo_log_width);
let _prefix_guard = logging::set_prefix(prefix);
let repo_context = RepoSyncContext {
config,
mirror,
work_dir,
redactor: redactor.clone(),
dry_run,
allow_force,
};
let result = sync_repo(
&repo_context,
&job.repo_name,
&mut job.existing,
create_missing,
ref_state,
)
.with_context(|| format!("failed to sync repo {}", job.repo_name))
.map(|outcome| RepoWorkerSuccess {
repo_name: job.repo_name.clone(),
outcome,
})
.map_err(|error| RepoWorkerFailure {
repo_name: job.repo_name,
error,
});
if sender.send(result).is_err() {
break;
}
}
});
}
drop(sender);
let mut failures = Vec::new();
for result in receiver {
match result {
Ok(success) => {
if let Some(refs) = success.outcome.ref_update {
context
.ref_state
.set_repo(&mirror.name, &success.repo_name, refs);
}
}
Err(failure) => {
let scope = format!("{}/{}", mirror.name, failure.repo_name);
print_failure(&scope, &failure.error);
failures.push(SyncFailure::repo(
mirror.name.clone(),
failure.repo_name,
failure.error,
));
}
}
}
failures
});
Ok(failures)
}
fn pop_repo_job(queue: &Arc<Mutex<VecDeque<RepoSyncJob>>>) -> Option<RepoSyncJob> {
queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.pop_front()
}
fn repo_log_width(repo_names: &BTreeSet<String>) -> usize {
repo_names
.iter()
.map(|name| name.chars().count())
.max()
.unwrap_or(4)
.clamp(4, 32)
}
struct RepoSyncJob {
repo_name: String,
existing: Vec<EndpointRepo>,
}
struct RepoWorkerSuccess {
repo_name: String,
outcome: RepoSyncOutcome,
}
struct RepoWorkerFailure {
repo_name: String,
error: anyhow::Error,
}
fn ensure_missing_repos(
config: &Config,
mirror: &MirrorConfig,
@@ -445,7 +566,7 @@ fn ensure_missing_repos(
continue;
}
if !create_missing {
println!(
crate::logln!(
" {} {} missing on {} ({})",
style("skip").yellow().bold(),
style(repo_name).cyan(),
@@ -455,7 +576,7 @@ fn ensure_missing_repos(
continue;
}
println!(
crate::logln!(
" {} {} {}",
style("create").green().bold(),
style(repo_name).cyan(),
@@ -478,7 +599,7 @@ fn ensure_missing_repos(
)
.with_context(|| format!("failed to create {} on {}", repo_name, endpoint.label()))?;
if created.private != matches!(mirror.visibility, crate::config::Visibility::Private) {
println!(
crate::logln!(
" {} created {} on {}, but provider reported a different visibility than requested",
style("warn").yellow().bold(),
style(repo_name).cyan(),
@@ -503,41 +624,46 @@ struct RepoSyncContext<'a> {
allow_force: bool,
}
#[derive(Default)]
struct RepoSyncOutcome {
ref_update: Option<BTreeMap<String, RemoteRefState>>,
}
fn sync_repo(
context: &RepoSyncContext<'_>,
repo_name: &str,
repos: &mut Vec<EndpointRepo>,
create_missing: bool,
ref_state: &mut RefState,
) -> Result<()> {
println!();
println!(
ref_state: &RefState,
) -> Result<RepoSyncOutcome> {
crate::logln!();
crate::logln!(
"{} {}",
style("Repo").magenta().bold(),
style(repo_name).bold()
);
if repos.is_empty() {
println!(
crate::logln!(
" {} {}",
style("skip").yellow().bold(),
style("repository not found on any endpoint").dim()
);
return Ok(());
return Ok(RepoSyncOutcome::default());
}
let initial_remotes = remote_specs(context, repos)?;
let Some(initial_ref_state) = check_remote_refs(context, repo_name, &initial_remotes)? else {
return Ok(());
return Ok(RepoSyncOutcome::default());
};
if !context.dry_run
&& all_configured_endpoints_present(context.mirror, repos)
&& ref_state.repo_matches(&context.mirror.name, repo_name, &initial_ref_state)
{
println!(
crate::logln!(
" {} refs unchanged since last successful sync",
style("up-to-date").green().bold()
);
return Ok(());
return Ok(RepoSyncOutcome::default());
}
let path = context
@@ -550,13 +676,13 @@ fn sync_repo(
for remote in &initial_remotes {
if let Err(error) = mirror_repo.fetch_remote(remote) {
if is_disabled_repository_error(&error) {
println!(
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(repo_name).cyan(),
style(format!("provider blocked access on {}", remote.display)).dim()
);
return Ok(());
return Ok(RepoSyncOutcome::default());
}
return Err(error).with_context(|| format!("failed to fetch {}", remote.display));
}
@@ -572,13 +698,13 @@ fn sync_repo(
)?;
if repos.len() < 2 {
println!(
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(repo_name).cyan(),
style("fewer than two endpoints have this repository").dim()
);
return Ok(());
return Ok(RepoSyncOutcome::default());
}
let remotes = remote_specs(context, repos)?;
@@ -593,13 +719,13 @@ fn sync_repo(
{
if let Err(error) = mirror_repo.fetch_remote(remote) {
if is_disabled_repository_error(&error) {
println!(
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(repo_name).cyan(),
style(format!("provider blocked access on {}", remote.display)).dim()
);
return Ok(());
return Ok(RepoSyncOutcome::default());
}
return Err(error).with_context(|| format!("failed to fetch {}", remote.display));
}
@@ -609,15 +735,17 @@ fn sync_repo(
if !context.dry_run && !result.had_conflicts {
let refs = if result.pushed {
let Some(refs) = check_remote_refs(context, repo_name, &remotes)? else {
return Ok(());
return Ok(RepoSyncOutcome::default());
};
refs
} else {
initial_ref_state
};
ref_state.set_repo(&context.mirror.name, repo_name, refs);
return Ok(RepoSyncOutcome {
ref_update: Some(refs),
});
}
Ok(())
Ok(RepoSyncOutcome::default())
}
fn all_configured_endpoints_present(mirror: &MirrorConfig, repos: &[EndpointRepo]) -> bool {
@@ -638,7 +766,7 @@ fn check_remote_refs(
) -> Result<Option<BTreeMap<String, RemoteRefState>>> {
let mut refs = BTreeMap::new();
for remote in remotes {
println!(
crate::logln!(
" {} {}",
style("check refs").cyan().bold(),
style(&remote.display).dim()
@@ -646,7 +774,7 @@ fn check_remote_refs(
let snapshot = match ls_remote_refs(remote, &context.redactor) {
Ok(snapshot) => snapshot,
Err(error) if is_disabled_repository_error(&error) => {
println!(
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(repo_name).cyan(),
@@ -711,7 +839,7 @@ fn push_repo_refs(
.map(|(remote, sha)| format!("{remote}@{}", short_sha(sha)))
.collect::<Vec<_>>()
.join(", ");
println!(
crate::logln!(
" {} branch {} diverged across {} ({})",
style("conflict").yellow().bold(),
style(conflict.branch).cyan(),
@@ -733,7 +861,7 @@ fn push_repo_refs(
.map(|(remote, sha)| format!("{remote}@{}", short_sha(sha)))
.collect::<Vec<_>>()
.join(", ");
println!(
crate::logln!(
" {} tag {} differs across {} ({})",
style("conflict").yellow().bold(),
style(conflict.tag).cyan(),
@@ -743,7 +871,7 @@ fn push_repo_refs(
}
if branches_to_push.is_empty() && tags_to_push.is_empty() {
println!(
crate::logln!(
" {} branches and tags already match all endpoints",
style("up-to-date").green().bold()
);
@@ -772,13 +900,13 @@ struct RepoRefSyncResult {
}
fn print_branch_decisions(branches: &[crate::git::BranchDecision]) {
println!(
crate::logln!(
" {} {}",
style("branches").cyan().bold(),
style(format!("({})", branches.len())).dim()
);
for branch in branches {
println!(
crate::logln!(
" {} {} {}",
style(&branch.branch).cyan(),
style(format!("@{}", short_sha(&branch.sha))).dim(),
@@ -793,13 +921,13 @@ fn print_branch_decisions(branches: &[crate::git::BranchDecision]) {
}
fn print_tag_decisions(tags: &[crate::git::TagDecision]) {
println!(
crate::logln!(
" {} {}",
style("tags").cyan().bold(),
style(format!("({})", tags.len())).dim()
);
for tag in tags {
println!(
crate::logln!(
" {} {} {}",
style(&tag.tag).cyan(),
style(format!("@{}", short_sha(&tag.sha))).dim(),