[O] Parallel everything (#5)

This commit is contained in:
2026-05-09 21:28:38 -04:00
committed by GitHub
parent 915a63a955
commit b41f530d1e
8 changed files with 252 additions and 148 deletions
+1 -1
View File
@@ -124,7 +124,7 @@ Retry only repositories that failed during the previous non-dry-run sync:
refray sync --retry-failed refray sync --retry-failed
``` ```
Control parallelism for sync, serve, and webhook commands in config: Control parallelism for sync, serve, and webhook commands in config. The default is 10 workers:
```toml ```toml
jobs = 8 jobs = 8
+1 -1
View File
@@ -9,7 +9,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
const APP_NAME: &str = "refray"; const APP_NAME: &str = "refray";
pub const DEFAULT_JOBS: usize = 4; pub const DEFAULT_JOBS: usize = 10;
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config { pub struct Config {
+1
View File
@@ -2,6 +2,7 @@ mod config;
mod git; mod git;
mod interactive; mod interactive;
mod logging; mod logging;
mod parallel;
mod provider; mod provider;
mod state; mod state;
mod sync; mod sync;
+73
View File
@@ -0,0 +1,73 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use anyhow::{Context, Result, bail};
pub fn map<I, O, F>(items: Vec<I>, jobs: usize, f: F) -> Result<Vec<O>>
where
I: Send,
O: Send,
F: Fn(I) -> Result<O> + Sync,
{
if jobs == 0 {
bail!("jobs must be at least 1");
}
if items.is_empty() {
return Ok(Vec::new());
}
let worker_count = jobs.min(items.len());
let queue = Arc::new(Mutex::new(VecDeque::from(items)));
let (sender, receiver) = mpsc::channel();
thread::scope(|scope| {
for _ in 0..worker_count {
let queue = Arc::clone(&queue);
let sender = sender.clone();
let f = &f;
scope.spawn(move || {
while let Some(item) = pop_item(&queue) {
if sender.send(f(item)).is_err() {
break;
}
}
});
}
drop(sender);
collect_results(receiver)
})
}
fn pop_item<I>(queue: &Arc<Mutex<VecDeque<I>>>) -> Option<I> {
queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.pop_front()
}
fn collect_results<O>(receiver: mpsc::Receiver<Result<O>>) -> Result<Vec<O>> {
let mut outputs = Vec::new();
let mut first_failure = None;
let mut failure_count = 0;
for result in receiver {
match result {
Ok(output) => outputs.push(output),
Err(error) => {
failure_count += 1;
first_failure.get_or_insert(error);
}
}
}
match (failure_count, first_failure) {
(0, None) => Ok(outputs),
(1, Some(error)) => Err(error),
(_, Some(error)) => {
Err(error).with_context(|| format!("{failure_count} parallel tasks failed"))
}
_ => unreachable!(),
}
}
+53 -1
View File
@@ -1,13 +1,17 @@
use std::collections::HashMap; use std::collections::HashMap;
use anyhow::{Context, Result, anyhow, bail}; use anyhow::{Context, Result, anyhow, bail};
use console::style;
use reqwest::blocking::{Client, Response}; use reqwest::blocking::{Client, Response};
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT}; use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
use serde::Deserialize; use serde::Deserialize;
use serde_json::json; use serde_json::json;
use url::Url; use url::Url;
use crate::config::{EndpointConfig, NamespaceKind, ProviderKind, SiteConfig, Visibility}; use crate::config::{
Config, EndpointConfig, MirrorConfig, NamespaceKind, ProviderKind, RepoNameFilter, SiteConfig,
Visibility,
};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RemoteRepo { pub struct RemoteRepo {
@@ -36,6 +40,54 @@ pub struct PullRequestInfo {
pub url: Option<String>, pub url: Option<String>,
} }
pub fn list_mirror_repos(
config: &Config,
mirror: &MirrorConfig,
repo_filter: &RepoNameFilter,
jobs: usize,
) -> Result<Vec<EndpointRepo>> {
let endpoint_jobs = mirror
.endpoints
.iter()
.cloned()
.enumerate()
.collect::<Vec<_>>();
let worker_count = jobs.min(endpoint_jobs.len());
if worker_count > 1 {
crate::logln!(
" {} listing repositories with {} workers",
style("jobs").cyan().bold(),
worker_count
);
}
let mut listed = crate::parallel::map(endpoint_jobs, jobs, |(index, endpoint)| {
let site = config.site(&endpoint.site).unwrap();
let client = ProviderClient::new(site)?;
crate::logln!(
" {} {}",
style("list").cyan().bold(),
style(endpoint.label()).dim()
);
let repos = client
.list_repos(&endpoint)
.with_context(|| format!("failed to list repos for {}", endpoint.label()))?;
let repos = repos
.into_iter()
.filter(|repo| mirror.sync_visibility.matches_private(repo.private))
.filter(|repo| repo_filter.matches(&repo.name))
.map(|repo| EndpointRepo {
endpoint: endpoint.clone(),
repo,
})
.collect::<Vec<_>>();
Ok((index, repos))
})?;
listed.sort_by_key(|(index, _)| *index);
Ok(listed.into_iter().flat_map(|(_, repos)| repos).collect())
}
pub struct ProviderClient<'a> { pub struct ProviderClient<'a> {
site: &'a SiteConfig, site: &'a SiteConfig,
token: String, token: String,
+100 -95
View File
@@ -17,7 +17,9 @@ use crate::git::{
is_disabled_repository_error, ls_remote_refs, safe_remote_name, is_disabled_repository_error, ls_remote_refs, safe_remote_name,
}; };
use crate::logging; use crate::logging;
use crate::provider::{EndpointRepo, ProviderClient, PullRequestRequest, repos_by_name}; use crate::provider::{
EndpointRepo, ProviderClient, PullRequestRequest, list_mirror_repos, repos_by_name,
};
use crate::webhook; use crate::webhook;
mod output; mod output;
@@ -163,7 +165,8 @@ fn sync_group(
.unwrap_or(mirror.create_missing); .unwrap_or(mirror.create_missing);
let repo_filter = mirror.repo_filter()?; let repo_filter = mirror.repo_filter()?;
let all_endpoint_repos = list_group_repos(context.config, mirror, &repo_filter)?; let all_endpoint_repos =
list_mirror_repos(context.config, mirror, &repo_filter, context.options.jobs)?;
if !context.options.dry_run { if !context.options.dry_run {
webhook::ensure_configured_webhooks( webhook::ensure_configured_webhooks(
context.config, context.config,
@@ -258,6 +261,7 @@ fn sync_group(
let queue = Arc::new(Mutex::new(repo_jobs)); let queue = Arc::new(Mutex::new(repo_jobs));
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
let use_status_area = worker_count > 1; let use_status_area = worker_count > 1;
let jobs = context.options.jobs;
let _status_guard = use_status_area.then(|| logging::start_status_area(worker_count)); let _status_guard = use_status_area.then(|| logging::start_status_area(worker_count));
let failures = thread::scope(|scope| { let failures = thread::scope(|scope| {
for worker_id in 0..worker_count { for worker_id in 0..worker_count {
@@ -280,6 +284,7 @@ fn sync_group(
work_dir, work_dir,
redactor: redactor.clone(), redactor: redactor.clone(),
dry_run, dry_run,
jobs,
}; };
let result = sync_repo( let result = sync_repo(
&repo_context, &repo_context,
@@ -340,50 +345,19 @@ fn sync_group(
}); });
if create_missing && !context.options.dry_run { if create_missing && !context.options.dry_run {
let repos = list_group_repos(context.config, mirror, &repo_filter)?; let repos = list_mirror_repos(context.config, mirror, &repo_filter, jobs)?;
webhook::ensure_configured_webhooks( webhook::ensure_configured_webhooks(
context.config, context.config,
mirror, mirror,
&repos, &repos,
context.work_dir, context.work_dir,
context.options.jobs, jobs,
)?; )?;
} }
Ok(failures) Ok(failures)
} }
fn list_group_repos(
config: &Config,
mirror: &MirrorConfig,
repo_filter: &RepoNameFilter,
) -> Result<Vec<EndpointRepo>> {
let mut all_endpoint_repos = Vec::new();
for endpoint in &mirror.endpoints {
let site = config.site(&endpoint.site).unwrap();
let client = ProviderClient::new(site)?;
crate::logln!(
" {} {}",
style("list").cyan().bold(),
style(endpoint.label()).dim()
);
let repos = client
.list_repos(endpoint)
.with_context(|| format!("failed to list repos for {}", endpoint.label()))?;
for repo in repos
.into_iter()
.filter(|repo| mirror.sync_visibility.matches_private(repo.private))
.filter(|repo| repo_filter.matches(&repo.name))
{
all_endpoint_repos.push(EndpointRepo {
endpoint: endpoint.clone(),
repo,
});
}
}
Ok(all_endpoint_repos)
}
fn sync_candidate_repo_names( fn sync_candidate_repo_names(
repos: &HashMap<String, Vec<EndpointRepo>>, repos: &HashMap<String, Vec<EndpointRepo>>,
ref_state: &RefState, ref_state: &RefState,
@@ -434,57 +408,71 @@ struct RepoWorkerFailure {
} }
fn ensure_missing_repos( fn ensure_missing_repos(
config: &Config, context: &RepoSyncContext<'_>,
mirror: &MirrorConfig,
repo_name: &str, repo_name: &str,
existing: &mut Vec<EndpointRepo>, existing: &mut Vec<EndpointRepo>,
create_missing: bool, create_missing: bool,
dry_run: bool,
) -> Result<()> { ) -> Result<()> {
let present = existing let present = existing
.iter() .iter()
.map(|repo| repo.endpoint.clone()) .map(|repo| repo.endpoint.clone())
.collect::<BTreeSet<_>>(); .collect::<BTreeSet<_>>();
let template = existing.first().map(|repo| repo.repo.clone()); let template = existing.first().map(|repo| repo.repo.clone());
let missing = context
.mirror
.endpoints
.iter()
.filter(|endpoint| !present.contains(*endpoint))
.cloned()
.collect::<Vec<_>>();
for endpoint in &mirror.endpoints { if !create_missing || context.dry_run {
if present.contains(endpoint) { for endpoint in &missing {
continue; if !create_missing {
} crate::logln!(
if !create_missing { " {} {} missing on {} ({})",
style("skip").yellow().bold(),
style(repo_name).cyan(),
style(endpoint.label()).dim(),
style("creation disabled").dim()
);
continue;
}
crate::logln!( crate::logln!(
" {} {} missing on {} ({})", " {} {} {}",
style("skip").yellow().bold(), style("create").green().bold(),
style(repo_name).cyan(), style(repo_name).cyan(),
style(endpoint.label()).dim(), style(format!("on {}", endpoint.label())).dim()
style("creation disabled").dim()
); );
continue;
} }
return Ok(());
}
let description = template.and_then(|repo| repo.description);
let expected_private = matches!(
&context.mirror.visibility,
crate::config::Visibility::Private
);
let create_jobs = missing.into_iter().enumerate().collect::<Vec<_>>();
let mut created = crate::parallel::map(create_jobs, context.jobs, |(index, endpoint)| {
crate::logln!( crate::logln!(
" {} {} {}", " {} {} {}",
style("create").green().bold(), style("create").green().bold(),
style(repo_name).cyan(), style(repo_name).cyan(),
style(format!("on {}", endpoint.label())).dim() style(format!("on {}", endpoint.label())).dim()
); );
if dry_run {
continue;
}
let site = config.site(&endpoint.site).unwrap(); let site = context.config.site(&endpoint.site).unwrap();
let client = ProviderClient::new(site)?; let client = ProviderClient::new(site)?;
let created = client let created = client
.create_repo( .create_repo(
endpoint, &endpoint,
repo_name, repo_name,
&mirror.visibility, &context.mirror.visibility,
template description.as_deref(),
.as_ref()
.and_then(|repo| repo.description.as_deref()),
) )
.with_context(|| format!("failed to create {} on {}", repo_name, endpoint.label()))?; .with_context(|| format!("failed to create {} on {}", repo_name, endpoint.label()))?;
if created.private != matches!(mirror.visibility, crate::config::Visibility::Private) { if created.private != expected_private {
crate::logln!( crate::logln!(
" {} created {} on {}, but provider reported a different visibility than requested", " {} created {} on {}, but provider reported a different visibility than requested",
style("warn").yellow().bold(), style("warn").yellow().bold(),
@@ -492,11 +480,16 @@ fn ensure_missing_repos(
style(endpoint.label()).dim() style(endpoint.label()).dim()
); );
} }
existing.push(EndpointRepo { Ok((
endpoint: endpoint.clone(), index,
repo: created, EndpointRepo {
}); endpoint,
} repo: created,
},
))
})?;
created.sort_by_key(|(index, _)| *index);
existing.extend(created.into_iter().map(|(_, repo)| repo));
Ok(()) Ok(())
} }
@@ -507,6 +500,7 @@ struct RepoSyncContext<'a> {
work_dir: &'a Path, work_dir: &'a Path,
redactor: Redactor, redactor: Redactor,
dry_run: bool, dry_run: bool,
jobs: usize,
} }
#[derive(Default)] #[derive(Default)]
@@ -592,14 +586,7 @@ fn sync_repo(
} }
} }
ensure_missing_repos( ensure_missing_repos(context, repo_name, repos, create_missing)?;
context.config,
context.mirror,
repo_name,
repos,
create_missing,
context.dry_run,
)?;
if repos.len() < 2 { if repos.len() < 2 {
crate::logln!( crate::logln!(
@@ -729,26 +716,30 @@ fn delete_repos(
repos: &[EndpointRepo], repos: &[EndpointRepo],
target_remotes: &[String], target_remotes: &[String],
) -> Result<()> { ) -> Result<()> {
for repo in repos { let delete_jobs = repos
let remote_name = remote_name_for_endpoint_repo(repo); .iter()
if !target_remotes.contains(&remote_name) { .filter(|repo| target_remotes.contains(&remote_name_for_endpoint_repo(repo)))
continue; .cloned()
.collect::<Vec<_>>();
if context.dry_run {
for repo in &delete_jobs {
crate::logln!(
" {} {} {}",
style("would delete").red().bold(),
style(repo_name).cyan(),
style(format!("from {}", repo.endpoint.label())).dim()
);
} }
return Ok(());
}
crate::parallel::map(delete_jobs, context.jobs, |repo| {
crate::logln!( crate::logln!(
" {} {} {}", " {} {} {}",
style(if context.dry_run { style("delete").red().bold(),
"would delete"
} else {
"delete"
})
.red()
.bold(),
style(repo_name).cyan(), style(repo_name).cyan(),
style(format!("from {}", repo.endpoint.label())).dim() style(format!("from {}", repo.endpoint.label())).dim()
); );
if context.dry_run {
continue;
}
let site = context.config.site(&repo.endpoint.site).unwrap(); let site = context.config.site(&repo.endpoint.site).unwrap();
let client = ProviderClient::new(site)?; let client = ProviderClient::new(site)?;
client client
@@ -760,7 +751,8 @@ fn delete_repos(
repo.endpoint.label() repo.endpoint.label()
) )
})?; })?;
} Ok(())
})?;
Ok(()) Ok(())
} }
@@ -803,15 +795,20 @@ fn check_remote_refs(
repo_name: &str, repo_name: &str,
remotes: &[RemoteSpec], remotes: &[RemoteSpec],
) -> Result<Option<BTreeMap<String, RemoteRefState>>> { ) -> Result<Option<BTreeMap<String, RemoteRefState>>> {
let mut refs = BTreeMap::new(); enum RemoteRefCheck {
for remote in remotes { Found(String, RemoteRefState),
Blocked,
}
let ref_jobs = remotes.to_vec();
let results = crate::parallel::map(ref_jobs, context.jobs, |remote| {
crate::logln!( crate::logln!(
" {} {}", " {} {}",
style("check refs").cyan().bold(), style("check refs").cyan().bold(),
style(&remote.display).dim() style(&remote.display).dim()
); );
let snapshot = match ls_remote_refs(remote, &context.redactor) { match ls_remote_refs(&remote, &context.redactor) {
Ok(snapshot) => snapshot, Ok(snapshot) => Ok(RemoteRefCheck::Found(remote.name, snapshot.into())),
Err(error) if is_disabled_repository_error(&error) => { Err(error) if is_disabled_repository_error(&error) => {
crate::logln!( crate::logln!(
" {} {} {}", " {} {} {}",
@@ -819,14 +816,22 @@ fn check_remote_refs(
style(repo_name).cyan(), style(repo_name).cyan(),
style(format!("provider blocked access on {}", remote.display)).dim() style(format!("provider blocked access on {}", remote.display)).dim()
); );
return Ok(None); Ok(RemoteRefCheck::Blocked)
} }
Err(error) => { Err(error) => {
return Err(error) Err(error).with_context(|| format!("failed to check refs for {}", remote.display))
.with_context(|| format!("failed to check refs for {}", remote.display));
} }
}; }
refs.insert(remote.name.clone(), snapshot.into()); })?;
let mut refs = BTreeMap::new();
for result in results {
match result {
RemoteRefCheck::Found(remote, refs_for_remote) => {
refs.insert(remote, refs_for_remote);
}
RemoteRefCheck::Blocked => return Ok(None),
}
} }
Ok(Some(refs)) Ok(Some(refs))
} }
+22 -50
View File
@@ -18,7 +18,7 @@ use crate::config::{
Config, EndpointConfig, MirrorConfig, ProviderKind, RepoNameFilter, default_work_dir, Config, EndpointConfig, MirrorConfig, ProviderKind, RepoNameFilter, default_work_dir,
validate_config, validate_config,
}; };
use crate::provider::{EndpointRepo, ProviderClient, RemoteRepo}; use crate::provider::{EndpointRepo, ProviderClient, RemoteRepo, list_mirror_repos};
use crate::state::{load_toml_or_default, save_toml}; use crate::state::{load_toml_or_default, save_toml};
use crate::sync::{SyncOptions, sync_all}; use crate::sync::{SyncOptions, sync_all};
@@ -189,31 +189,17 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
); );
let repo_filter = mirror.repo_filter()?; let repo_filter = mirror.repo_filter()?;
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for endpoint in &mirror.endpoints { for endpoint_repo in list_mirror_repos(config, mirror, &repo_filter, options.jobs)? {
let site = config.site(&endpoint.site).unwrap(); let site = config.site(&endpoint_repo.endpoint.site).unwrap();
let client = ProviderClient::new(site)?; tasks.push(WebhookInstallTask {
crate::logln!( site: site.clone(),
" {} {}", group: mirror.name.clone(),
style("list").cyan().bold(), endpoint: endpoint_repo.endpoint,
style(endpoint.label()).dim() repo: endpoint_repo.repo,
); url: options.url.clone(),
let repos = client secret: options.secret.clone(),
.list_repos(endpoint) dry_run: options.dry_run,
.with_context(|| format!("failed to list repos for {}", endpoint.label()))?; });
for repo in repos
.into_iter()
.filter(|repo| webhook_repo_matches(mirror, &repo_filter, repo))
{
tasks.push(WebhookInstallTask {
site: site.clone(),
group: mirror.name.clone(),
endpoint: endpoint.clone(),
repo,
url: options.url.clone(),
secret: options.secret.clone(),
dry_run: options.dry_run,
});
}
} }
run_install_tasks(tasks, options.jobs, Arc::clone(&state))?; run_install_tasks(tasks, options.jobs, Arc::clone(&state))?;
} }
@@ -242,30 +228,16 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) ->
style(&mirror.name).bold() style(&mirror.name).bold()
); );
let repo_filter = mirror.repo_filter()?; let repo_filter = mirror.repo_filter()?;
for endpoint in &mirror.endpoints { for endpoint_repo in list_mirror_repos(config, mirror, &repo_filter, options.jobs)? {
let site = config.site(&endpoint.site).unwrap(); let site = config.site(&endpoint_repo.endpoint.site).unwrap();
let client = ProviderClient::new(site)?; tasks.push(WebhookUninstallTask {
crate::logln!( group: mirror.name.clone(),
" {} {}", site: site.clone(),
style("list").cyan().bold(), endpoint: endpoint_repo.endpoint,
style(endpoint.label()).dim() repo: endpoint_repo.repo,
); url: options.url.clone(),
let repos = client dry_run: options.dry_run,
.list_repos(endpoint) });
.with_context(|| format!("failed to list repos for {}", endpoint.label()))?;
for repo in repos
.into_iter()
.filter(|repo| webhook_repo_matches(mirror, &repo_filter, repo))
{
tasks.push(WebhookUninstallTask {
group: mirror.name.clone(),
site: site.clone(),
endpoint: endpoint.clone(),
repo,
url: options.url.clone(),
dry_run: options.dry_run,
});
}
} }
} }
let removed_keys = run_uninstall_tasks(tasks, options.jobs)?; let removed_keys = run_uninstall_tasks(tasks, options.jobs)?;
+1
View File
@@ -88,6 +88,7 @@ fn env_token_form_is_rejected() {
fn config_defaults_jobs() { fn config_defaults_jobs() {
let config: Config = toml::from_str("").unwrap(); let config: Config = toml::from_str("").unwrap();
assert_eq!(DEFAULT_JOBS, 10);
assert_eq!(config.jobs, DEFAULT_JOBS); assert_eq!(config.jobs, DEFAULT_JOBS);
} }