diff --git a/src/main.rs b/src/main.rs index a961b3e..d4d3ff4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -102,6 +102,8 @@ struct WebhookInstallCommand { dry_run: bool, #[arg(long, value_name = "PATH")] work_dir: Option, + #[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")] + jobs: usize, } #[derive(Args, Debug)] @@ -112,6 +114,8 @@ struct WebhookUninstallCommand { dry_run: bool, #[arg(long, value_name = "PATH")] work_dir: Option, + #[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")] + jobs: usize, } fn main() -> Result<()> { @@ -176,6 +180,7 @@ fn main() -> Result<()> { repo_pattern: command.repo_pattern, dry_run: command.dry_run, work_dir: command.work_dir, + jobs: command.jobs, }, ) } @@ -187,6 +192,7 @@ fn main() -> Result<()> { group: command.group, dry_run: command.dry_run, work_dir: command.work_dir, + jobs: command.jobs, }, ) } diff --git a/src/sync.rs b/src/sync.rs index dcbff64..01f75b8 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -169,6 +169,7 @@ fn sync_group( mirror, &all_endpoint_repos, context.work_dir, + context.options.jobs, )?; } @@ -331,7 +332,13 @@ fn sync_group( if create_missing && !context.options.dry_run { let repos = list_group_repos(context.config, mirror)?; - webhook::ensure_configured_webhooks(context.config, mirror, &repos, context.work_dir)?; + webhook::ensure_configured_webhooks( + context.config, + mirror, + &repos, + context.work_dir, + context.options.jobs, + )?; } Ok(failures) diff --git a/src/webhook.rs b/src/webhook.rs index 571de63..dc9ab1a 100644 --- a/src/webhook.rs +++ b/src/webhook.rs @@ -43,6 +43,7 @@ pub struct WebhookInstallOptions { pub repo_pattern: Option, pub dry_run: bool, pub work_dir: Option, + pub jobs: usize, } #[derive(Clone, Debug)] @@ -50,6 +51,7 @@ pub struct WebhookUninstallOptions { pub group: Option, pub dry_run: bool, pub work_dir: Option, + pub jobs: usize, } #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] @@ -163,8 +165,11 @@ fn reachability_timer_loop(url: String, minutes: u64) { pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Result<()> { validate_config(config)?; + if options.jobs == 0 { + bail!("--jobs must be at least 1"); + } let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir); - let mut state = load_webhook_state(&work_dir)?; + let state = Arc::new(Mutex::new(load_webhook_state(&work_dir)?)); let repo_pattern = options .repo_pattern .as_deref() @@ -186,6 +191,7 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu style("Webhook group").cyan().bold(), style(&mirror.name).bold() ); + let mut tasks = Vec::new(); for endpoint in &mirror.endpoints { let site = config.site(&endpoint.site).unwrap(); let client = ProviderClient::new(site)?; @@ -204,22 +210,23 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu { continue; } - install_repo_webhook( - &WebhookInstallRequest { - client: &client, - group: &mirror.name, - endpoint, - repo: &repo, - url: &options.url, - secret: &options.secret, - dry_run: options.dry_run, - }, - &mut state, - )?; + tasks.push(WebhookInstallTask { + site: site.clone(), + group: mirror.name.clone(), + endpoint: endpoint.clone(), + repo, + url: options.url.clone(), + secret: options.secret.clone(), + dry_run: options.dry_run, + }); } } + run_install_tasks(tasks, options.jobs, Arc::clone(&state))?; } if !options.dry_run { + let state = state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); save_webhook_state(&work_dir, &state)?; } Ok(()) @@ -227,6 +234,9 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) -> Result<()> { validate_config(config)?; + if options.jobs == 0 { + bail!("--jobs must be at least 1"); + } let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir); let mut state = load_webhook_state(&work_dir)?; if state.installations.is_empty() { @@ -237,7 +247,7 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) -> return Ok(()); } - let mut removed_keys = Vec::new(); + let mut tasks = Vec::new(); for (key, installation) in &state.installations { if options .group @@ -246,46 +256,14 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) -> { continue; } - crate::logln!( - " {} {} {}", - style(if options.dry_run { - "would uninstall" - } else { - "uninstall" - }) - .red() - .bold(), - style(&installation.repo).cyan(), - style(format!("from {}", installation.endpoint.label())).dim() - ); - if options.dry_run { - continue; - } - let Some(site) = config.site(&installation.endpoint.site) else { - crate::logln!( - " {} {} {}", - style("skip").yellow().bold(), - style(&installation.repo).cyan(), - style(format!("unknown site {}", installation.endpoint.site)).dim() - ); - continue; - }; - let client = ProviderClient::new(site)?; - client - .uninstall_webhook( - &installation.endpoint, - &installation.repo, - &installation.url, - ) - .with_context(|| { - format!( - "failed to uninstall webhook for {} from {}", - installation.repo, - installation.endpoint.label() - ) - })?; - removed_keys.push(key.clone()); + tasks.push(WebhookUninstallTask { + key: key.clone(), + site: config.site(&installation.endpoint.site).cloned(), + installation: installation.clone(), + dry_run: options.dry_run, + }); } + let removed_keys = run_uninstall_tasks(tasks, options.jobs)?; if !options.dry_run { for key in removed_keys { @@ -301,6 +279,7 @@ pub fn ensure_configured_webhooks( mirror: &MirrorConfig, repos: &[EndpointRepo], work_dir: &Path, + jobs: usize, ) -> Result<()> { let Some(webhook) = &config.webhook else { return Ok(()); @@ -308,26 +287,30 @@ pub fn ensure_configured_webhooks( if !webhook.install { return Ok(()); } + if jobs == 0 { + bail!("--jobs must be at least 1"); + } let secret = webhook.secret()?; - let mut state = load_webhook_state(work_dir)?; + let state = Arc::new(Mutex::new(load_webhook_state(work_dir)?)); + let mut tasks = Vec::new(); for endpoint_repo in repos { let Some(site) = config.site(&endpoint_repo.endpoint.site) else { continue; }; - let client = ProviderClient::new(site)?; - install_repo_webhook( - &WebhookInstallRequest { - client: &client, - group: &mirror.name, - endpoint: &endpoint_repo.endpoint, - repo: &endpoint_repo.repo, - url: &webhook.url, - secret: &secret, - dry_run: false, - }, - &mut state, - )?; + tasks.push(WebhookInstallTask { + site: site.clone(), + group: mirror.name.clone(), + endpoint: endpoint_repo.endpoint.clone(), + repo: endpoint_repo.repo.clone(), + url: webhook.url.clone(), + secret: secret.clone(), + dry_run: false, + }); } + run_install_tasks(tasks, jobs, Arc::clone(&state))?; + let state = state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); save_webhook_state(work_dir, &state) } @@ -405,6 +388,8 @@ fn worker_loop( struct WebhookState { #[serde(default)] installations: BTreeMap, + #[serde(default)] + skipped: BTreeMap, } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -415,65 +400,306 @@ struct WebhookInstallation { url: String, } -struct WebhookInstallRequest<'a> { - client: &'a ProviderClient<'a>, - group: &'a str, - endpoint: &'a EndpointConfig, - repo: &'a RemoteRepo, - url: &'a str, - secret: &'a str, +#[derive(Clone, Debug, Deserialize, Serialize)] +struct SkippedWebhookInstallation { + group: String, + endpoint: EndpointConfig, + repo: String, + url: String, + reason: String, +} + +#[derive(Clone, Debug)] +struct WebhookInstallTask { + site: crate::config::SiteConfig, + group: String, + endpoint: EndpointConfig, + repo: RemoteRepo, + url: String, + secret: String, dry_run: bool, } -fn install_repo_webhook( - request: &WebhookInstallRequest<'_>, - state: &mut WebhookState, +#[derive(Clone, Debug)] +struct WebhookUninstallTask { + key: String, + site: Option, + installation: WebhookInstallation, + dry_run: bool, +} + +fn run_install_tasks( + tasks: Vec, + jobs: usize, + state: Arc>, ) -> Result<()> { - let key = webhook_installation_key(request.group, request.endpoint, &request.repo.name); - if state - .installations - .get(&key) - .is_some_and(|installation| installation.url == request.url) - { + if tasks.is_empty() { return Ok(()); } + let worker_count = jobs.min(tasks.len()); + let (task_sender, task_receiver) = mpsc::channel(); + let (result_sender, result_receiver) = mpsc::channel(); + for task in tasks { + task_sender.send(task)?; + } + drop(task_sender); + let task_receiver = Arc::new(Mutex::new(task_receiver)); + let mut handles = Vec::new(); + for _ in 0..worker_count { + let task_receiver = Arc::clone(&task_receiver); + let result_sender = result_sender.clone(); + let state = Arc::clone(&state); + handles.push(thread::spawn(move || { + loop { + let task = { + let task_receiver = task_receiver + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + task_receiver.recv() + }; + let Ok(task) = task else { + break; + }; + if result_sender + .send(install_webhook_task(task, &state)) + .is_err() + { + break; + } + } + })); + } + drop(result_sender); + + let mut failures = Vec::new(); + for result in result_receiver { + if let Err(error) = result { + crate::logln!(" {} {error:#}", style("fail").red().bold()); + failures.push(error); + } + } + for handle in handles { + let _ = handle.join(); + } + if !failures.is_empty() { + bail!( + "webhook installation completed with {} failure(s)", + failures.len() + ); + } + Ok(()) +} + +fn run_uninstall_tasks(tasks: Vec, jobs: usize) -> Result> { + if tasks.is_empty() { + return Ok(Vec::new()); + } + let worker_count = jobs.min(tasks.len()); + let (task_sender, task_receiver) = mpsc::channel(); + let (result_sender, result_receiver) = mpsc::channel(); + for task in tasks { + task_sender.send(task)?; + } + drop(task_sender); + let task_receiver = Arc::new(Mutex::new(task_receiver)); + let mut handles = Vec::new(); + for _ in 0..worker_count { + let task_receiver = Arc::clone(&task_receiver); + let result_sender = result_sender.clone(); + handles.push(thread::spawn(move || { + loop { + let task = { + let task_receiver = task_receiver + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + task_receiver.recv() + }; + let Ok(task) = task else { + break; + }; + if result_sender.send(uninstall_webhook_task(task)).is_err() { + break; + } + } + })); + } + drop(result_sender); + + let mut removed_keys = Vec::new(); + let mut failures = Vec::new(); + for result in result_receiver { + match result { + Ok(Some(key)) => removed_keys.push(key), + Ok(None) => {} + Err(error) => { + crate::logln!(" {} {error:#}", style("fail").red().bold()); + failures.push(error); + } + } + } + for handle in handles { + let _ = handle.join(); + } + if !failures.is_empty() { + bail!( + "webhook uninstall completed with {} failure(s)", + failures.len() + ); + } + Ok(removed_keys) +} + +fn install_webhook_task(task: WebhookInstallTask, state: &Arc>) -> Result<()> { + let key = webhook_installation_key(&task.group, &task.endpoint, &task.repo.name); + { + let state = state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if state + .installations + .get(&key) + .is_some_and(|installation| installation.url == task.url) + { + return Ok(()); + } + if state + .skipped + .get(&key) + .is_some_and(|skipped| skipped.url == task.url) + { + return Ok(()); + } + } crate::logln!( " {} {} {}", - style(if request.dry_run { + style(if task.dry_run { "would install" } else { "install" }) .green() .bold(), - style(&request.repo.name).cyan(), - style(format!("webhook on {}", request.endpoint.label())).dim() + style(&task.repo.name).cyan(), + style(format!("webhook on {}", task.endpoint.label())).dim() ); - if request.dry_run { + if task.dry_run { return Ok(()); } - request - .client - .install_webhook(request.endpoint, request.repo, request.url, request.secret) - .with_context(|| { + let client = ProviderClient::new(&task.site)?; + if let Err(error) = client.install_webhook(&task.endpoint, &task.repo, &task.url, &task.secret) + { + if let Some(reason) = non_actionable_webhook_failure_reason(&error) { + crate::logln!( + " {} {} {}", + style("skip").yellow().bold(), + style(&task.repo.name).cyan(), + style(format!("webhook on {}: {reason}", task.endpoint.label())).dim() + ); + let mut state = state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.skipped.insert( + key, + SkippedWebhookInstallation { + group: task.group, + endpoint: task.endpoint, + repo: task.repo.name, + url: task.url, + reason, + }, + ); + return Ok(()); + } + return Err(error).with_context(|| { format!( "failed to install webhook for {} on {}", - request.repo.name, - request.endpoint.label() + task.repo.name, + task.endpoint.label() ) - })?; + }); + } + let mut state = state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.skipped.remove(&key); state.installations.insert( key, WebhookInstallation { - group: request.group.to_string(), - endpoint: request.endpoint.clone(), - repo: request.repo.name.clone(), - url: request.url.to_string(), + group: task.group, + endpoint: task.endpoint, + repo: task.repo.name, + url: task.url, }, ); Ok(()) } +fn uninstall_webhook_task(task: WebhookUninstallTask) -> Result> { + crate::logln!( + " {} {} {}", + style(if task.dry_run { + "would uninstall" + } else { + "uninstall" + }) + .red() + .bold(), + style(&task.installation.repo).cyan(), + style(format!("from {}", task.installation.endpoint.label())).dim() + ); + if task.dry_run { + return Ok(None); + } + let Some(site) = task.site else { + crate::logln!( + " {} {} {}", + style("skip").yellow().bold(), + style(&task.installation.repo).cyan(), + style(format!("unknown site {}", task.installation.endpoint.site)).dim() + ); + return Ok(None); + }; + let client = ProviderClient::new(&site)?; + client + .uninstall_webhook( + &task.installation.endpoint, + &task.installation.repo, + &task.installation.url, + ) + .with_context(|| { + format!( + "failed to uninstall webhook for {} from {}", + task.installation.repo, + task.installation.endpoint.label() + ) + })?; + Ok(Some(task.key)) +} + +fn non_actionable_webhook_failure_reason(error: &anyhow::Error) -> Option { + let text = error + .chain() + .map(ToString::to_string) + .collect::>() + .join("\n") + .to_ascii_lowercase(); + if text.contains("repository access blocked") + || text.contains("access to this repository has been disabled") + || text.contains("repository has been disabled") + || text.contains("disabled by github staff") + || text.contains("github.com/tos") + { + return Some("provider blocked access".to_string()); + } + if text.contains("repository was archived") + || text.contains("archived so is read-only") + || text.contains("repository is archived") + { + return Some("repository is archived/read-only".to_string()); + } + None +} + fn webhook_installation_key(group: &str, endpoint: &EndpointConfig, repo: &str) -> String { format!( "{}\t{}\t{:?}\t{}\t{}", diff --git a/tests/unit/cli.rs b/tests/unit/cli.rs index 4ff3ece..79c41c6 100644 --- a/tests/unit/cli.rs +++ b/tests/unit/cli.rs @@ -97,6 +97,8 @@ fn cli_accepts_webhook_install() { "sync-1", "--repo-pattern", "^repo$", + "--jobs", + "6", ]) .unwrap(); @@ -110,6 +112,7 @@ fn cli_accepts_webhook_install() { assert_eq!(args.secret, Some("secret".to_string())); assert_eq!(args.group, Some("sync-1".to_string())); assert_eq!(args.repo_pattern, Some("^repo$".to_string())); + assert_eq!(args.jobs, 6); } #[test] @@ -121,6 +124,8 @@ fn cli_accepts_webhook_uninstall() { "--group", "sync-1", "--dry-run", + "--jobs", + "3", ]) .unwrap(); @@ -129,4 +134,5 @@ fn cli_accepts_webhook_uninstall() { }; assert_eq!(args.group, Some("sync-1".to_string())); assert!(args.dry_run); + assert_eq!(args.jobs, 3); } diff --git a/tests/unit/webhook.rs b/tests/unit/webhook.rs index a7b2fa3..5c87f31 100644 --- a/tests/unit/webhook.rs +++ b/tests/unit/webhook.rs @@ -2,6 +2,9 @@ use super::*; use crate::config::{ EndpointConfig, MirrorConfig, NamespaceKind, SiteConfig, TokenConfig, Visibility, }; +use std::io::{Read, Write}; +use std::net::TcpListener; +use std::thread; #[test] fn verifies_github_hmac_signature() { @@ -150,6 +153,60 @@ fn webhook_state_persists_installations() { ); } +#[test] +fn blocked_webhook_install_is_skipped_and_recorded() { + let (api_url, handle) = one_request_server( + "403 Forbidden", + r#"{"message":"Repository access blocked","block":{"html_url":"https://github.com/tos"}}"#, + |request| assert!(request.starts_with("GET /repos/alice/repo/hooks ")), + ); + let site = SiteConfig { + api_url: Some(api_url), + ..site("github", ProviderKind::Github) + }; + let state = Arc::new(Mutex::new(WebhookState::default())); + + install_webhook_task( + WebhookInstallTask { + site, + group: "sync-1".to_string(), + endpoint: endpoint("github", NamespaceKind::User, "alice"), + repo: RemoteRepo { + name: "repo".to_string(), + clone_url: "https://github.com/alice/repo.git".to_string(), + private: true, + description: None, + }, + url: "https://mirror.example.test/webhook".to_string(), + secret: "secret".to_string(), + dry_run: false, + }, + &state, + ) + .unwrap(); + + let state = state.lock().unwrap(); + assert!(state.installations.is_empty()); + assert_eq!(state.skipped.len(), 1); + assert_eq!( + state.skipped.values().next().unwrap().reason, + "provider blocked access" + ); + handle.join().unwrap(); +} + +#[test] +fn archived_webhook_failure_is_non_actionable() { + let error = anyhow::anyhow!( + "POST https://api.github.com/repos/acme/repo/hooks returned 403 Forbidden: Repository was archived so is read-only." + ); + + assert_eq!( + non_actionable_webhook_failure_reason(&error).as_deref(), + Some("repository is archived/read-only") + ); +} + fn site(name: &str, provider: ProviderKind) -> SiteConfig { SiteConfig { name: name.to_string(), @@ -168,3 +225,30 @@ fn endpoint(site: &str, kind: NamespaceKind, namespace: &str) -> EndpointConfig namespace: namespace.to_string(), } } + +fn one_request_server( + status: &'static str, + body: &'static str, + assert_request: F, +) -> (String, thread::JoinHandle<()>) +where + F: FnOnce(&str) + Send + 'static, +{ + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap(); + let handle = thread::spawn(move || { + let (mut stream, _) = listener.accept().unwrap(); + let mut buffer = [0_u8; 4096]; + let bytes = stream.read(&mut buffer).unwrap(); + let request = String::from_utf8_lossy(&buffer[..bytes]).to_string(); + assert_request(&request); + + write!( + stream, + "HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\n\r\n{body}", + body.len() + ) + .unwrap(); + }); + (format!("http://{address}"), handle) +}