[O] Optimize sync with heuristic
This commit is contained in:
+67
@@ -15,6 +15,12 @@ pub struct RemoteSpec {
|
||||
pub display: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct RemoteRefSnapshot {
|
||||
pub hash: String,
|
||||
pub refs: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BranchDecision {
|
||||
pub branch: String,
|
||||
@@ -415,6 +421,44 @@ impl GitMirror {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ls_remote_refs(remote: &RemoteSpec, redactor: &Redactor) -> Result<RemoteRefSnapshot> {
|
||||
let output = Command::new("git")
|
||||
.args(["ls-remote", "--heads", "--tags", "--refs", &remote.url])
|
||||
.output()
|
||||
.with_context(|| "failed to run git ls-remote")?;
|
||||
if !output.status.success() {
|
||||
let stdout = redactor.redact(&String::from_utf8_lossy(&output.stdout));
|
||||
let stderr = redactor.redact(&String::from_utf8_lossy(&output.stderr));
|
||||
return Err(GitCommandError::new("git ls-remote", stdout, stderr).into());
|
||||
}
|
||||
|
||||
let mut refs = String::from_utf8_lossy(&output.stdout)
|
||||
.lines()
|
||||
.map(str::trim)
|
||||
.filter(|line| !line.is_empty())
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>();
|
||||
refs.sort();
|
||||
|
||||
Ok(RemoteRefSnapshot {
|
||||
hash: stable_ref_hash(&refs),
|
||||
refs: refs.len(),
|
||||
})
|
||||
}
|
||||
|
||||
fn stable_ref_hash(refs: &[String]) -> String {
|
||||
// FNV-1a is enough here: this is a deterministic change detector, not a
|
||||
// security boundary.
|
||||
let mut hash = 0xcbf2_9ce4_8422_2325u64;
|
||||
for line in refs {
|
||||
for byte in line.bytes().chain(std::iter::once(b'\n')) {
|
||||
hash ^= u64::from(byte);
|
||||
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
|
||||
}
|
||||
}
|
||||
format!("{hash:016x}")
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct GitCommandError {
|
||||
program: String,
|
||||
@@ -593,6 +637,29 @@ mod tests {
|
||||
assert!(!is_disabled_repository_error(&generic_forbidden));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ls_remote_snapshot_changes_when_remote_refs_change() {
|
||||
let fixture = GitFixture::new();
|
||||
fixture.commit("base", "base", 1_700_000_000);
|
||||
fixture.tag("v1");
|
||||
fixture.push_head(&fixture.remote_a, "main");
|
||||
fixture.push_tag(&fixture.remote_a, "v1");
|
||||
let remote = fixture.remotes().remove(0);
|
||||
let redactor = Redactor::new(Vec::new());
|
||||
|
||||
let first = ls_remote_refs(&remote, &redactor).unwrap();
|
||||
let unchanged = ls_remote_refs(&remote, &redactor).unwrap();
|
||||
assert_eq!(first, unchanged);
|
||||
assert_eq!(first.refs, 2);
|
||||
|
||||
fixture.commit("feature", "feature", 1_700_000_100);
|
||||
fixture.push_head(&fixture.remote_a, "feature");
|
||||
let changed = ls_remote_refs(&remote, &redactor).unwrap();
|
||||
|
||||
assert_ne!(first.hash, changed.hash);
|
||||
assert_eq!(changed.refs, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn branch_decisions_choose_fast_forward_tip() {
|
||||
let fixture = GitFixture::new();
|
||||
|
||||
+247
-36
@@ -8,10 +8,14 @@ use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::{Config, EndpointConfig, MirrorConfig, default_work_dir, validate_config};
|
||||
use crate::git::{GitMirror, Redactor, RemoteSpec, is_disabled_repository_error, safe_remote_name};
|
||||
use crate::git::{
|
||||
GitMirror, Redactor, RemoteRefSnapshot, RemoteSpec, is_disabled_repository_error,
|
||||
ls_remote_refs, safe_remote_name,
|
||||
};
|
||||
use crate::provider::{EndpointRepo, ProviderClient, repos_by_name};
|
||||
|
||||
const FAILURE_STATE_FILE: &str = "failed-repos.toml";
|
||||
const REF_STATE_FILE: &str = "ref-state.toml";
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct SyncOptions {
|
||||
@@ -61,18 +65,20 @@ pub fn sync_all(config: &Config, options: SyncOptions) -> Result<()> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut ref_state = load_ref_state(&work_dir)?;
|
||||
let mut failures = Vec::new();
|
||||
|
||||
for mirror in mirrors {
|
||||
match sync_group(
|
||||
let mut group_context = GroupSyncContext {
|
||||
config,
|
||||
mirror,
|
||||
&options,
|
||||
&work_dir,
|
||||
redactor.clone(),
|
||||
repo_pattern.as_ref(),
|
||||
retry_failed_repos.as_ref(),
|
||||
) {
|
||||
options: &options,
|
||||
work_dir: &work_dir,
|
||||
redactor: redactor.clone(),
|
||||
repo_pattern: repo_pattern.as_ref(),
|
||||
retry_failed_repos: retry_failed_repos.as_ref(),
|
||||
ref_state: &mut ref_state,
|
||||
};
|
||||
match sync_group(&mut group_context, mirror) {
|
||||
Ok(mut group_failures) => failures.append(&mut group_failures),
|
||||
Err(error) => {
|
||||
let scope = format!("mirror group {}", mirror.name);
|
||||
@@ -84,6 +90,7 @@ pub fn sync_all(config: &Config, options: SyncOptions) -> Result<()> {
|
||||
|
||||
if !options.dry_run {
|
||||
save_failure_state(&work_dir, &FailureState::from_failures(&failures))?;
|
||||
save_ref_state(&work_dir, &ref_state)?;
|
||||
}
|
||||
|
||||
if !failures.is_empty() {
|
||||
@@ -185,6 +192,69 @@ fn failure_state_path(work_dir: &Path) -> PathBuf {
|
||||
work_dir.join(FAILURE_STATE_FILE)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
struct RemoteRefState {
|
||||
hash: String,
|
||||
refs: usize,
|
||||
}
|
||||
|
||||
impl From<RemoteRefSnapshot> for RemoteRefState {
|
||||
fn from(value: RemoteRefSnapshot) -> Self {
|
||||
Self {
|
||||
hash: value.hash,
|
||||
refs: value.refs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
struct RefState {
|
||||
#[serde(default)]
|
||||
repos: BTreeMap<String, BTreeMap<String, BTreeMap<String, RemoteRefState>>>,
|
||||
}
|
||||
|
||||
impl RefState {
|
||||
fn repo_matches(
|
||||
&self,
|
||||
group: &str,
|
||||
repo: &str,
|
||||
refs: &BTreeMap<String, RemoteRefState>,
|
||||
) -> bool {
|
||||
self.repos.get(group).and_then(|repos| repos.get(repo)) == Some(refs)
|
||||
}
|
||||
|
||||
fn set_repo(&mut self, group: &str, repo: &str, refs: BTreeMap<String, RemoteRefState>) {
|
||||
self.repos
|
||||
.entry(group.to_string())
|
||||
.or_default()
|
||||
.insert(repo.to_string(), refs);
|
||||
}
|
||||
}
|
||||
|
||||
fn load_ref_state(work_dir: &Path) -> Result<RefState> {
|
||||
let path = ref_state_path(work_dir);
|
||||
if !path.exists() {
|
||||
return Ok(RefState::default());
|
||||
}
|
||||
let contents =
|
||||
fs::read_to_string(&path).with_context(|| format!("failed to read {}", path.display()))?;
|
||||
toml::from_str(&contents).with_context(|| format!("failed to parse {}", path.display()))
|
||||
}
|
||||
|
||||
fn save_ref_state(work_dir: &Path, state: &RefState) -> Result<()> {
|
||||
let path = ref_state_path(work_dir);
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)
|
||||
.with_context(|| format!("failed to create {}", parent.display()))?;
|
||||
}
|
||||
let contents = toml::to_string_pretty(state)?;
|
||||
fs::write(&path, contents).with_context(|| format!("failed to write {}", path.display()))
|
||||
}
|
||||
|
||||
fn ref_state_path(work_dir: &Path) -> PathBuf {
|
||||
work_dir.join(REF_STATE_FILE)
|
||||
}
|
||||
|
||||
fn print_failure(scope: &str, error: &anyhow::Error) {
|
||||
println!(
|
||||
" {} {} {}",
|
||||
@@ -221,14 +291,19 @@ fn format_error(error: &anyhow::Error) -> String {
|
||||
format!("{error:#}")
|
||||
}
|
||||
|
||||
fn sync_group(
|
||||
config: &Config,
|
||||
mirror: &MirrorConfig,
|
||||
options: &SyncOptions,
|
||||
work_dir: &Path,
|
||||
struct GroupSyncContext<'a> {
|
||||
config: &'a Config,
|
||||
options: &'a SyncOptions,
|
||||
work_dir: &'a Path,
|
||||
redactor: Redactor,
|
||||
repo_pattern: Option<&Regex>,
|
||||
retry_failed_repos: Option<&BTreeMap<String, BTreeSet<String>>>,
|
||||
repo_pattern: Option<&'a Regex>,
|
||||
retry_failed_repos: Option<&'a BTreeMap<String, BTreeSet<String>>>,
|
||||
ref_state: &'a mut RefState,
|
||||
}
|
||||
|
||||
fn sync_group(
|
||||
context: &mut GroupSyncContext<'_>,
|
||||
mirror: &MirrorConfig,
|
||||
) -> Result<Vec<SyncFailure>> {
|
||||
println!();
|
||||
println!(
|
||||
@@ -236,14 +311,15 @@ fn sync_group(
|
||||
style("Mirror group").cyan().bold(),
|
||||
style(&mirror.name).bold()
|
||||
);
|
||||
let create_missing = options
|
||||
let create_missing = context
|
||||
.options
|
||||
.create_missing_override
|
||||
.unwrap_or(mirror.create_missing);
|
||||
let allow_force = options.force_override.unwrap_or(mirror.allow_force);
|
||||
let allow_force = context.options.force_override.unwrap_or(mirror.allow_force);
|
||||
|
||||
let mut all_endpoint_repos = Vec::new();
|
||||
for endpoint in &mirror.endpoints {
|
||||
let site = config.site(&endpoint.site).unwrap();
|
||||
let site = context.config.site(&endpoint.site).unwrap();
|
||||
let client = ProviderClient::new(site)?;
|
||||
println!(
|
||||
" {} {}",
|
||||
@@ -263,11 +339,15 @@ fn sync_group(
|
||||
|
||||
let mut repos = repos_by_name(all_endpoint_repos);
|
||||
let all_repo_count = repos.len();
|
||||
let retry_repo_names = retry_failed_repos.and_then(|repos| repos.get(&mirror.name));
|
||||
let retry_repo_names = context
|
||||
.retry_failed_repos
|
||||
.and_then(|repos| repos.get(&mirror.name));
|
||||
let repo_names = repos
|
||||
.keys()
|
||||
.filter(|name| {
|
||||
repo_pattern.is_none_or(|pattern| pattern.is_match(name))
|
||||
context
|
||||
.repo_pattern
|
||||
.is_none_or(|pattern| pattern.is_match(name))
|
||||
&& retry_repo_names.is_none_or(|repos| repos.contains(name.as_str()))
|
||||
})
|
||||
.cloned()
|
||||
@@ -279,12 +359,12 @@ fn sync_group(
|
||||
style("skip").yellow().bold(),
|
||||
retry_repo_names.len()
|
||||
);
|
||||
} else if retry_failed_repos.is_some() {
|
||||
} else if context.retry_failed_repos.is_some() {
|
||||
println!(
|
||||
" {} no previous failures for this group",
|
||||
style("skip").yellow().bold()
|
||||
);
|
||||
} else if let Some(pattern) = repo_pattern {
|
||||
} else if let Some(pattern) = context.repo_pattern {
|
||||
println!(
|
||||
" {} no repositories match {} ({} discovered)",
|
||||
style("skip").yellow().bold(),
|
||||
@@ -299,7 +379,7 @@ fn sync_group(
|
||||
}
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
if let Some(pattern) = repo_pattern {
|
||||
if let Some(pattern) = context.repo_pattern {
|
||||
println!(
|
||||
" {} {} of {} repositories match {}",
|
||||
style("filter").cyan().bold(),
|
||||
@@ -320,16 +400,22 @@ fn sync_group(
|
||||
let mut failures = Vec::new();
|
||||
for repo_name in repo_names {
|
||||
let mut existing = repos.remove(&repo_name).unwrap_or_default();
|
||||
let context = RepoSyncContext {
|
||||
config,
|
||||
let repo_context = RepoSyncContext {
|
||||
config: context.config,
|
||||
mirror,
|
||||
work_dir,
|
||||
redactor: redactor.clone(),
|
||||
dry_run: options.dry_run,
|
||||
work_dir: context.work_dir,
|
||||
redactor: context.redactor.clone(),
|
||||
dry_run: context.options.dry_run,
|
||||
allow_force,
|
||||
};
|
||||
if let Err(error) = sync_repo(&context, &repo_name, &mut existing, create_missing)
|
||||
.with_context(|| format!("failed to sync repo {repo_name}"))
|
||||
if let Err(error) = sync_repo(
|
||||
&repo_context,
|
||||
&repo_name,
|
||||
&mut existing,
|
||||
create_missing,
|
||||
&mut *context.ref_state,
|
||||
)
|
||||
.with_context(|| format!("failed to sync repo {repo_name}"))
|
||||
{
|
||||
let scope = format!("{}/{}", mirror.name, repo_name);
|
||||
print_failure(&scope, &error);
|
||||
@@ -422,6 +508,7 @@ fn sync_repo(
|
||||
repo_name: &str,
|
||||
repos: &mut Vec<EndpointRepo>,
|
||||
create_missing: bool,
|
||||
ref_state: &mut RefState,
|
||||
) -> Result<()> {
|
||||
println!();
|
||||
println!(
|
||||
@@ -438,13 +525,27 @@ fn sync_repo(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let initial_remotes = remote_specs(context, repos)?;
|
||||
let Some(initial_ref_state) = check_remote_refs(context, repo_name, &initial_remotes)? else {
|
||||
return Ok(());
|
||||
};
|
||||
if !context.dry_run
|
||||
&& all_configured_endpoints_present(context.mirror, repos)
|
||||
&& ref_state.repo_matches(&context.mirror.name, repo_name, &initial_ref_state)
|
||||
{
|
||||
println!(
|
||||
" {} refs unchanged since last successful sync",
|
||||
style("up-to-date").green().bold()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let path = context
|
||||
.work_dir
|
||||
.join(safe_remote_name(&context.mirror.name))
|
||||
.join(format!("{}.git", safe_remote_name(repo_name)));
|
||||
let mirror_repo = GitMirror::open(path, context.redactor.clone(), context.dry_run)?;
|
||||
|
||||
let initial_remotes = remote_specs(context, repos)?;
|
||||
mirror_repo.configure_remotes(&initial_remotes)?;
|
||||
for remote in &initial_remotes {
|
||||
if let Err(error) = mirror_repo.fetch_remote(remote) {
|
||||
@@ -504,7 +605,63 @@ fn sync_repo(
|
||||
}
|
||||
}
|
||||
|
||||
push_repo_refs(context, &mirror_repo, &remotes)
|
||||
let result = push_repo_refs(context, &mirror_repo, &remotes)?;
|
||||
if !context.dry_run && !result.had_conflicts {
|
||||
let refs = if result.pushed {
|
||||
let Some(refs) = check_remote_refs(context, repo_name, &remotes)? else {
|
||||
return Ok(());
|
||||
};
|
||||
refs
|
||||
} else {
|
||||
initial_ref_state
|
||||
};
|
||||
ref_state.set_repo(&context.mirror.name, repo_name, refs);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn all_configured_endpoints_present(mirror: &MirrorConfig, repos: &[EndpointRepo]) -> bool {
|
||||
let present = repos
|
||||
.iter()
|
||||
.map(|repo| repo.endpoint.clone())
|
||||
.collect::<BTreeSet<_>>();
|
||||
mirror
|
||||
.endpoints
|
||||
.iter()
|
||||
.all(|endpoint| present.contains(endpoint))
|
||||
}
|
||||
|
||||
fn check_remote_refs(
|
||||
context: &RepoSyncContext<'_>,
|
||||
repo_name: &str,
|
||||
remotes: &[RemoteSpec],
|
||||
) -> Result<Option<BTreeMap<String, RemoteRefState>>> {
|
||||
let mut refs = BTreeMap::new();
|
||||
for remote in remotes {
|
||||
println!(
|
||||
" {} {}",
|
||||
style("check refs").cyan().bold(),
|
||||
style(&remote.display).dim()
|
||||
);
|
||||
let snapshot = match ls_remote_refs(remote, &context.redactor) {
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(error) if is_disabled_repository_error(&error) => {
|
||||
println!(
|
||||
" {} {} {}",
|
||||
style("skip").yellow().bold(),
|
||||
style(repo_name).cyan(),
|
||||
style(format!("provider blocked access on {}", remote.display)).dim()
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(error)
|
||||
.with_context(|| format!("failed to check refs for {}", remote.display));
|
||||
}
|
||||
};
|
||||
refs.insert(remote.name.clone(), snapshot.into());
|
||||
}
|
||||
Ok(Some(refs))
|
||||
}
|
||||
|
||||
fn remote_specs(context: &RepoSyncContext<'_>, repos: &[EndpointRepo]) -> Result<Vec<RemoteSpec>> {
|
||||
@@ -540,8 +697,9 @@ fn push_repo_refs(
|
||||
context: &RepoSyncContext<'_>,
|
||||
mirror_repo: &GitMirror,
|
||||
remotes: &[RemoteSpec],
|
||||
) -> Result<()> {
|
||||
) -> Result<RepoRefSyncResult> {
|
||||
let (branches, conflicts) = mirror_repo.branch_decisions(remotes, context.allow_force)?;
|
||||
let had_branch_conflicts = !conflicts.is_empty();
|
||||
let branches_to_push = branches
|
||||
.into_iter()
|
||||
.filter(|branch| !branch.target_remotes.is_empty())
|
||||
@@ -563,6 +721,7 @@ fn push_repo_refs(
|
||||
}
|
||||
|
||||
let (tags, tag_conflicts) = mirror_repo.tag_decisions(remotes)?;
|
||||
let had_tag_conflicts = !tag_conflicts.is_empty();
|
||||
let tags_to_push = tags
|
||||
.into_iter()
|
||||
.filter(|tag| !tag.target_remotes.is_empty())
|
||||
@@ -588,7 +747,10 @@ fn push_repo_refs(
|
||||
" {} branches and tags already match all endpoints",
|
||||
style("up-to-date").green().bold()
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(RepoRefSyncResult {
|
||||
pushed: false,
|
||||
had_conflicts: had_branch_conflicts || had_tag_conflicts,
|
||||
});
|
||||
}
|
||||
if !branches_to_push.is_empty() {
|
||||
print_branch_decisions(&branches_to_push);
|
||||
@@ -598,7 +760,15 @@ fn push_repo_refs(
|
||||
print_tag_decisions(&tags_to_push);
|
||||
mirror_repo.push_tags(remotes, &tags_to_push)?;
|
||||
}
|
||||
Ok(())
|
||||
Ok(RepoRefSyncResult {
|
||||
pushed: true,
|
||||
had_conflicts: had_branch_conflicts || had_tag_conflicts,
|
||||
})
|
||||
}
|
||||
|
||||
struct RepoRefSyncResult {
|
||||
pushed: bool,
|
||||
had_conflicts: bool,
|
||||
}
|
||||
|
||||
fn print_branch_decisions(branches: &[crate::git::BranchDecision]) {
|
||||
@@ -704,4 +874,45 @@ mod tests {
|
||||
|
||||
assert!(!failure_state_path(temp.path()).exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ref_state_persists_and_requires_exact_remote_ref_match() {
|
||||
let temp = tempfile::TempDir::new().unwrap();
|
||||
let mut refs = BTreeMap::new();
|
||||
refs.insert(
|
||||
"github_alice".to_string(),
|
||||
RemoteRefState {
|
||||
hash: "abc".to_string(),
|
||||
refs: 2,
|
||||
},
|
||||
);
|
||||
refs.insert(
|
||||
"gitea_alice".to_string(),
|
||||
RemoteRefState {
|
||||
hash: "def".to_string(),
|
||||
refs: 2,
|
||||
},
|
||||
);
|
||||
let mut state = RefState::default();
|
||||
state.set_repo("sync-1", "repo-a", refs.clone());
|
||||
|
||||
save_ref_state(temp.path(), &state).unwrap();
|
||||
let loaded = load_ref_state(temp.path()).unwrap();
|
||||
|
||||
assert!(loaded.repo_matches("sync-1", "repo-a", &refs));
|
||||
|
||||
let mut changed_hash = refs.clone();
|
||||
changed_hash.insert(
|
||||
"github_alice".to_string(),
|
||||
RemoteRefState {
|
||||
hash: "changed".to_string(),
|
||||
refs: 2,
|
||||
},
|
||||
);
|
||||
assert!(!loaded.repo_matches("sync-1", "repo-a", &changed_hash));
|
||||
|
||||
let mut missing_remote = refs;
|
||||
missing_remote.remove("gitea_alice");
|
||||
assert!(!loaded.repo_matches("sync-1", "repo-a", &missing_remote));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user