[O] Better webhook error handling

This commit is contained in:
2026-05-08 01:03:30 +00:00
parent 0bc4abf1a7
commit 09e95f4ad5
5 changed files with 428 additions and 99 deletions
+6
View File
@@ -102,6 +102,8 @@ struct WebhookInstallCommand {
dry_run: bool,
#[arg(long, value_name = "PATH")]
work_dir: Option<PathBuf>,
#[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")]
jobs: usize,
}
#[derive(Args, Debug)]
@@ -112,6 +114,8 @@ struct WebhookUninstallCommand {
dry_run: bool,
#[arg(long, value_name = "PATH")]
work_dir: Option<PathBuf>,
#[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")]
jobs: usize,
}
fn main() -> Result<()> {
@@ -176,6 +180,7 @@ fn main() -> Result<()> {
repo_pattern: command.repo_pattern,
dry_run: command.dry_run,
work_dir: command.work_dir,
jobs: command.jobs,
},
)
}
@@ -187,6 +192,7 @@ fn main() -> Result<()> {
group: command.group,
dry_run: command.dry_run,
work_dir: command.work_dir,
jobs: command.jobs,
},
)
}
+8 -1
View File
@@ -169,6 +169,7 @@ fn sync_group(
mirror,
&all_endpoint_repos,
context.work_dir,
context.options.jobs,
)?;
}
@@ -331,7 +332,13 @@ fn sync_group(
if create_missing && !context.options.dry_run {
let repos = list_group_repos(context.config, mirror)?;
webhook::ensure_configured_webhooks(context.config, mirror, &repos, context.work_dir)?;
webhook::ensure_configured_webhooks(
context.config,
mirror,
&repos,
context.work_dir,
context.options.jobs,
)?;
}
Ok(failures)
+324 -98
View File
@@ -43,6 +43,7 @@ pub struct WebhookInstallOptions {
pub repo_pattern: Option<String>,
pub dry_run: bool,
pub work_dir: Option<PathBuf>,
pub jobs: usize,
}
#[derive(Clone, Debug)]
@@ -50,6 +51,7 @@ pub struct WebhookUninstallOptions {
pub group: Option<String>,
pub dry_run: bool,
pub work_dir: Option<PathBuf>,
pub jobs: usize,
}
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
@@ -163,8 +165,11 @@ fn reachability_timer_loop(url: String, minutes: u64) {
pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Result<()> {
validate_config(config)?;
if options.jobs == 0 {
bail!("--jobs must be at least 1");
}
let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir);
let mut state = load_webhook_state(&work_dir)?;
let state = Arc::new(Mutex::new(load_webhook_state(&work_dir)?));
let repo_pattern = options
.repo_pattern
.as_deref()
@@ -186,6 +191,7 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
style("Webhook group").cyan().bold(),
style(&mirror.name).bold()
);
let mut tasks = Vec::new();
for endpoint in &mirror.endpoints {
let site = config.site(&endpoint.site).unwrap();
let client = ProviderClient::new(site)?;
@@ -204,22 +210,23 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
{
continue;
}
install_repo_webhook(
&WebhookInstallRequest {
client: &client,
group: &mirror.name,
endpoint,
repo: &repo,
url: &options.url,
secret: &options.secret,
dry_run: options.dry_run,
},
&mut state,
)?;
tasks.push(WebhookInstallTask {
site: site.clone(),
group: mirror.name.clone(),
endpoint: endpoint.clone(),
repo,
url: options.url.clone(),
secret: options.secret.clone(),
dry_run: options.dry_run,
});
}
}
run_install_tasks(tasks, options.jobs, Arc::clone(&state))?;
}
if !options.dry_run {
let state = state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
save_webhook_state(&work_dir, &state)?;
}
Ok(())
@@ -227,6 +234,9 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) -> Result<()> {
validate_config(config)?;
if options.jobs == 0 {
bail!("--jobs must be at least 1");
}
let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir);
let mut state = load_webhook_state(&work_dir)?;
if state.installations.is_empty() {
@@ -237,7 +247,7 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) ->
return Ok(());
}
let mut removed_keys = Vec::new();
let mut tasks = Vec::new();
for (key, installation) in &state.installations {
if options
.group
@@ -246,46 +256,14 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) ->
{
continue;
}
crate::logln!(
" {} {} {}",
style(if options.dry_run {
"would uninstall"
} else {
"uninstall"
})
.red()
.bold(),
style(&installation.repo).cyan(),
style(format!("from {}", installation.endpoint.label())).dim()
);
if options.dry_run {
continue;
}
let Some(site) = config.site(&installation.endpoint.site) else {
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(&installation.repo).cyan(),
style(format!("unknown site {}", installation.endpoint.site)).dim()
);
continue;
};
let client = ProviderClient::new(site)?;
client
.uninstall_webhook(
&installation.endpoint,
&installation.repo,
&installation.url,
)
.with_context(|| {
format!(
"failed to uninstall webhook for {} from {}",
installation.repo,
installation.endpoint.label()
)
})?;
removed_keys.push(key.clone());
tasks.push(WebhookUninstallTask {
key: key.clone(),
site: config.site(&installation.endpoint.site).cloned(),
installation: installation.clone(),
dry_run: options.dry_run,
});
}
let removed_keys = run_uninstall_tasks(tasks, options.jobs)?;
if !options.dry_run {
for key in removed_keys {
@@ -301,6 +279,7 @@ pub fn ensure_configured_webhooks(
mirror: &MirrorConfig,
repos: &[EndpointRepo],
work_dir: &Path,
jobs: usize,
) -> Result<()> {
let Some(webhook) = &config.webhook else {
return Ok(());
@@ -308,26 +287,30 @@ pub fn ensure_configured_webhooks(
if !webhook.install {
return Ok(());
}
if jobs == 0 {
bail!("--jobs must be at least 1");
}
let secret = webhook.secret()?;
let mut state = load_webhook_state(work_dir)?;
let state = Arc::new(Mutex::new(load_webhook_state(work_dir)?));
let mut tasks = Vec::new();
for endpoint_repo in repos {
let Some(site) = config.site(&endpoint_repo.endpoint.site) else {
continue;
};
let client = ProviderClient::new(site)?;
install_repo_webhook(
&WebhookInstallRequest {
client: &client,
group: &mirror.name,
endpoint: &endpoint_repo.endpoint,
repo: &endpoint_repo.repo,
url: &webhook.url,
secret: &secret,
dry_run: false,
},
&mut state,
)?;
tasks.push(WebhookInstallTask {
site: site.clone(),
group: mirror.name.clone(),
endpoint: endpoint_repo.endpoint.clone(),
repo: endpoint_repo.repo.clone(),
url: webhook.url.clone(),
secret: secret.clone(),
dry_run: false,
});
}
run_install_tasks(tasks, jobs, Arc::clone(&state))?;
let state = state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
save_webhook_state(work_dir, &state)
}
@@ -405,6 +388,8 @@ fn worker_loop(
struct WebhookState {
#[serde(default)]
installations: BTreeMap<String, WebhookInstallation>,
#[serde(default)]
skipped: BTreeMap<String, SkippedWebhookInstallation>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -415,65 +400,306 @@ struct WebhookInstallation {
url: String,
}
struct WebhookInstallRequest<'a> {
client: &'a ProviderClient<'a>,
group: &'a str,
endpoint: &'a EndpointConfig,
repo: &'a RemoteRepo,
url: &'a str,
secret: &'a str,
#[derive(Clone, Debug, Deserialize, Serialize)]
struct SkippedWebhookInstallation {
group: String,
endpoint: EndpointConfig,
repo: String,
url: String,
reason: String,
}
#[derive(Clone, Debug)]
struct WebhookInstallTask {
site: crate::config::SiteConfig,
group: String,
endpoint: EndpointConfig,
repo: RemoteRepo,
url: String,
secret: String,
dry_run: bool,
}
fn install_repo_webhook(
request: &WebhookInstallRequest<'_>,
state: &mut WebhookState,
#[derive(Clone, Debug)]
struct WebhookUninstallTask {
key: String,
site: Option<crate::config::SiteConfig>,
installation: WebhookInstallation,
dry_run: bool,
}
fn run_install_tasks(
tasks: Vec<WebhookInstallTask>,
jobs: usize,
state: Arc<Mutex<WebhookState>>,
) -> Result<()> {
let key = webhook_installation_key(request.group, request.endpoint, &request.repo.name);
if state
.installations
.get(&key)
.is_some_and(|installation| installation.url == request.url)
{
if tasks.is_empty() {
return Ok(());
}
let worker_count = jobs.min(tasks.len());
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
for task in tasks {
task_sender.send(task)?;
}
drop(task_sender);
let task_receiver = Arc::new(Mutex::new(task_receiver));
let mut handles = Vec::new();
for _ in 0..worker_count {
let task_receiver = Arc::clone(&task_receiver);
let result_sender = result_sender.clone();
let state = Arc::clone(&state);
handles.push(thread::spawn(move || {
loop {
let task = {
let task_receiver = task_receiver
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
task_receiver.recv()
};
let Ok(task) = task else {
break;
};
if result_sender
.send(install_webhook_task(task, &state))
.is_err()
{
break;
}
}
}));
}
drop(result_sender);
let mut failures = Vec::new();
for result in result_receiver {
if let Err(error) = result {
crate::logln!(" {} {error:#}", style("fail").red().bold());
failures.push(error);
}
}
for handle in handles {
let _ = handle.join();
}
if !failures.is_empty() {
bail!(
"webhook installation completed with {} failure(s)",
failures.len()
);
}
Ok(())
}
fn run_uninstall_tasks(tasks: Vec<WebhookUninstallTask>, jobs: usize) -> Result<Vec<String>> {
if tasks.is_empty() {
return Ok(Vec::new());
}
let worker_count = jobs.min(tasks.len());
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
for task in tasks {
task_sender.send(task)?;
}
drop(task_sender);
let task_receiver = Arc::new(Mutex::new(task_receiver));
let mut handles = Vec::new();
for _ in 0..worker_count {
let task_receiver = Arc::clone(&task_receiver);
let result_sender = result_sender.clone();
handles.push(thread::spawn(move || {
loop {
let task = {
let task_receiver = task_receiver
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
task_receiver.recv()
};
let Ok(task) = task else {
break;
};
if result_sender.send(uninstall_webhook_task(task)).is_err() {
break;
}
}
}));
}
drop(result_sender);
let mut removed_keys = Vec::new();
let mut failures = Vec::new();
for result in result_receiver {
match result {
Ok(Some(key)) => removed_keys.push(key),
Ok(None) => {}
Err(error) => {
crate::logln!(" {} {error:#}", style("fail").red().bold());
failures.push(error);
}
}
}
for handle in handles {
let _ = handle.join();
}
if !failures.is_empty() {
bail!(
"webhook uninstall completed with {} failure(s)",
failures.len()
);
}
Ok(removed_keys)
}
fn install_webhook_task(task: WebhookInstallTask, state: &Arc<Mutex<WebhookState>>) -> Result<()> {
let key = webhook_installation_key(&task.group, &task.endpoint, &task.repo.name);
{
let state = state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if state
.installations
.get(&key)
.is_some_and(|installation| installation.url == task.url)
{
return Ok(());
}
if state
.skipped
.get(&key)
.is_some_and(|skipped| skipped.url == task.url)
{
return Ok(());
}
}
crate::logln!(
" {} {} {}",
style(if request.dry_run {
style(if task.dry_run {
"would install"
} else {
"install"
})
.green()
.bold(),
style(&request.repo.name).cyan(),
style(format!("webhook on {}", request.endpoint.label())).dim()
style(&task.repo.name).cyan(),
style(format!("webhook on {}", task.endpoint.label())).dim()
);
if request.dry_run {
if task.dry_run {
return Ok(());
}
request
.client
.install_webhook(request.endpoint, request.repo, request.url, request.secret)
.with_context(|| {
let client = ProviderClient::new(&task.site)?;
if let Err(error) = client.install_webhook(&task.endpoint, &task.repo, &task.url, &task.secret)
{
if let Some(reason) = non_actionable_webhook_failure_reason(&error) {
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(&task.repo.name).cyan(),
style(format!("webhook on {}: {reason}", task.endpoint.label())).dim()
);
let mut state = state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.skipped.insert(
key,
SkippedWebhookInstallation {
group: task.group,
endpoint: task.endpoint,
repo: task.repo.name,
url: task.url,
reason,
},
);
return Ok(());
}
return Err(error).with_context(|| {
format!(
"failed to install webhook for {} on {}",
request.repo.name,
request.endpoint.label()
task.repo.name,
task.endpoint.label()
)
})?;
});
}
let mut state = state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.skipped.remove(&key);
state.installations.insert(
key,
WebhookInstallation {
group: request.group.to_string(),
endpoint: request.endpoint.clone(),
repo: request.repo.name.clone(),
url: request.url.to_string(),
group: task.group,
endpoint: task.endpoint,
repo: task.repo.name,
url: task.url,
},
);
Ok(())
}
fn uninstall_webhook_task(task: WebhookUninstallTask) -> Result<Option<String>> {
crate::logln!(
" {} {} {}",
style(if task.dry_run {
"would uninstall"
} else {
"uninstall"
})
.red()
.bold(),
style(&task.installation.repo).cyan(),
style(format!("from {}", task.installation.endpoint.label())).dim()
);
if task.dry_run {
return Ok(None);
}
let Some(site) = task.site else {
crate::logln!(
" {} {} {}",
style("skip").yellow().bold(),
style(&task.installation.repo).cyan(),
style(format!("unknown site {}", task.installation.endpoint.site)).dim()
);
return Ok(None);
};
let client = ProviderClient::new(&site)?;
client
.uninstall_webhook(
&task.installation.endpoint,
&task.installation.repo,
&task.installation.url,
)
.with_context(|| {
format!(
"failed to uninstall webhook for {} from {}",
task.installation.repo,
task.installation.endpoint.label()
)
})?;
Ok(Some(task.key))
}
fn non_actionable_webhook_failure_reason(error: &anyhow::Error) -> Option<String> {
let text = error
.chain()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join("\n")
.to_ascii_lowercase();
if text.contains("repository access blocked")
|| text.contains("access to this repository has been disabled")
|| text.contains("repository has been disabled")
|| text.contains("disabled by github staff")
|| text.contains("github.com/tos")
{
return Some("provider blocked access".to_string());
}
if text.contains("repository was archived")
|| text.contains("archived so is read-only")
|| text.contains("repository is archived")
{
return Some("repository is archived/read-only".to_string());
}
None
}
fn webhook_installation_key(group: &str, endpoint: &EndpointConfig, repo: &str) -> String {
format!(
"{}\t{}\t{:?}\t{}\t{}",
+6
View File
@@ -97,6 +97,8 @@ fn cli_accepts_webhook_install() {
"sync-1",
"--repo-pattern",
"^repo$",
"--jobs",
"6",
])
.unwrap();
@@ -110,6 +112,7 @@ fn cli_accepts_webhook_install() {
assert_eq!(args.secret, Some("secret".to_string()));
assert_eq!(args.group, Some("sync-1".to_string()));
assert_eq!(args.repo_pattern, Some("^repo$".to_string()));
assert_eq!(args.jobs, 6);
}
#[test]
@@ -121,6 +124,8 @@ fn cli_accepts_webhook_uninstall() {
"--group",
"sync-1",
"--dry-run",
"--jobs",
"3",
])
.unwrap();
@@ -129,4 +134,5 @@ fn cli_accepts_webhook_uninstall() {
};
assert_eq!(args.group, Some("sync-1".to_string()));
assert!(args.dry_run);
assert_eq!(args.jobs, 3);
}
+84
View File
@@ -2,6 +2,9 @@ use super::*;
use crate::config::{
EndpointConfig, MirrorConfig, NamespaceKind, SiteConfig, TokenConfig, Visibility,
};
use std::io::{Read, Write};
use std::net::TcpListener;
use std::thread;
#[test]
fn verifies_github_hmac_signature() {
@@ -150,6 +153,60 @@ fn webhook_state_persists_installations() {
);
}
#[test]
fn blocked_webhook_install_is_skipped_and_recorded() {
let (api_url, handle) = one_request_server(
"403 Forbidden",
r#"{"message":"Repository access blocked","block":{"html_url":"https://github.com/tos"}}"#,
|request| assert!(request.starts_with("GET /repos/alice/repo/hooks ")),
);
let site = SiteConfig {
api_url: Some(api_url),
..site("github", ProviderKind::Github)
};
let state = Arc::new(Mutex::new(WebhookState::default()));
install_webhook_task(
WebhookInstallTask {
site,
group: "sync-1".to_string(),
endpoint: endpoint("github", NamespaceKind::User, "alice"),
repo: RemoteRepo {
name: "repo".to_string(),
clone_url: "https://github.com/alice/repo.git".to_string(),
private: true,
description: None,
},
url: "https://mirror.example.test/webhook".to_string(),
secret: "secret".to_string(),
dry_run: false,
},
&state,
)
.unwrap();
let state = state.lock().unwrap();
assert!(state.installations.is_empty());
assert_eq!(state.skipped.len(), 1);
assert_eq!(
state.skipped.values().next().unwrap().reason,
"provider blocked access"
);
handle.join().unwrap();
}
#[test]
fn archived_webhook_failure_is_non_actionable() {
let error = anyhow::anyhow!(
"POST https://api.github.com/repos/acme/repo/hooks returned 403 Forbidden: Repository was archived so is read-only."
);
assert_eq!(
non_actionable_webhook_failure_reason(&error).as_deref(),
Some("repository is archived/read-only")
);
}
fn site(name: &str, provider: ProviderKind) -> SiteConfig {
SiteConfig {
name: name.to_string(),
@@ -168,3 +225,30 @@ fn endpoint(site: &str, kind: NamespaceKind, namespace: &str) -> EndpointConfig
namespace: namespace.to_string(),
}
}
fn one_request_server<F>(
status: &'static str,
body: &'static str,
assert_request: F,
) -> (String, thread::JoinHandle<()>)
where
F: FnOnce(&str) + Send + 'static,
{
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
let mut buffer = [0_u8; 4096];
let bytes = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[..bytes]).to_string();
assert_request(&request);
write!(
stream,
"HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\n\r\n{body}",
body.len()
)
.unwrap();
});
(format!("http://{address}"), handle)
}