[O] Better webhook error handling
This commit is contained in:
@@ -102,6 +102,8 @@ struct WebhookInstallCommand {
|
||||
dry_run: bool,
|
||||
#[arg(long, value_name = "PATH")]
|
||||
work_dir: Option<PathBuf>,
|
||||
#[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")]
|
||||
jobs: usize,
|
||||
}
|
||||
|
||||
#[derive(Args, Debug)]
|
||||
@@ -112,6 +114,8 @@ struct WebhookUninstallCommand {
|
||||
dry_run: bool,
|
||||
#[arg(long, value_name = "PATH")]
|
||||
work_dir: Option<PathBuf>,
|
||||
#[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")]
|
||||
jobs: usize,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -176,6 +180,7 @@ fn main() -> Result<()> {
|
||||
repo_pattern: command.repo_pattern,
|
||||
dry_run: command.dry_run,
|
||||
work_dir: command.work_dir,
|
||||
jobs: command.jobs,
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -187,6 +192,7 @@ fn main() -> Result<()> {
|
||||
group: command.group,
|
||||
dry_run: command.dry_run,
|
||||
work_dir: command.work_dir,
|
||||
jobs: command.jobs,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
+8
-1
@@ -169,6 +169,7 @@ fn sync_group(
|
||||
mirror,
|
||||
&all_endpoint_repos,
|
||||
context.work_dir,
|
||||
context.options.jobs,
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -331,7 +332,13 @@ fn sync_group(
|
||||
|
||||
if create_missing && !context.options.dry_run {
|
||||
let repos = list_group_repos(context.config, mirror)?;
|
||||
webhook::ensure_configured_webhooks(context.config, mirror, &repos, context.work_dir)?;
|
||||
webhook::ensure_configured_webhooks(
|
||||
context.config,
|
||||
mirror,
|
||||
&repos,
|
||||
context.work_dir,
|
||||
context.options.jobs,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(failures)
|
||||
|
||||
+324
-98
@@ -43,6 +43,7 @@ pub struct WebhookInstallOptions {
|
||||
pub repo_pattern: Option<String>,
|
||||
pub dry_run: bool,
|
||||
pub work_dir: Option<PathBuf>,
|
||||
pub jobs: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -50,6 +51,7 @@ pub struct WebhookUninstallOptions {
|
||||
pub group: Option<String>,
|
||||
pub dry_run: bool,
|
||||
pub work_dir: Option<PathBuf>,
|
||||
pub jobs: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
|
||||
@@ -163,8 +165,11 @@ fn reachability_timer_loop(url: String, minutes: u64) {
|
||||
|
||||
pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Result<()> {
|
||||
validate_config(config)?;
|
||||
if options.jobs == 0 {
|
||||
bail!("--jobs must be at least 1");
|
||||
}
|
||||
let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir);
|
||||
let mut state = load_webhook_state(&work_dir)?;
|
||||
let state = Arc::new(Mutex::new(load_webhook_state(&work_dir)?));
|
||||
let repo_pattern = options
|
||||
.repo_pattern
|
||||
.as_deref()
|
||||
@@ -186,6 +191,7 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
|
||||
style("Webhook group").cyan().bold(),
|
||||
style(&mirror.name).bold()
|
||||
);
|
||||
let mut tasks = Vec::new();
|
||||
for endpoint in &mirror.endpoints {
|
||||
let site = config.site(&endpoint.site).unwrap();
|
||||
let client = ProviderClient::new(site)?;
|
||||
@@ -204,22 +210,23 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
|
||||
{
|
||||
continue;
|
||||
}
|
||||
install_repo_webhook(
|
||||
&WebhookInstallRequest {
|
||||
client: &client,
|
||||
group: &mirror.name,
|
||||
endpoint,
|
||||
repo: &repo,
|
||||
url: &options.url,
|
||||
secret: &options.secret,
|
||||
dry_run: options.dry_run,
|
||||
},
|
||||
&mut state,
|
||||
)?;
|
||||
tasks.push(WebhookInstallTask {
|
||||
site: site.clone(),
|
||||
group: mirror.name.clone(),
|
||||
endpoint: endpoint.clone(),
|
||||
repo,
|
||||
url: options.url.clone(),
|
||||
secret: options.secret.clone(),
|
||||
dry_run: options.dry_run,
|
||||
});
|
||||
}
|
||||
}
|
||||
run_install_tasks(tasks, options.jobs, Arc::clone(&state))?;
|
||||
}
|
||||
if !options.dry_run {
|
||||
let state = state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
save_webhook_state(&work_dir, &state)?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -227,6 +234,9 @@ pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Resu
|
||||
|
||||
pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) -> Result<()> {
|
||||
validate_config(config)?;
|
||||
if options.jobs == 0 {
|
||||
bail!("--jobs must be at least 1");
|
||||
}
|
||||
let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir);
|
||||
let mut state = load_webhook_state(&work_dir)?;
|
||||
if state.installations.is_empty() {
|
||||
@@ -237,7 +247,7 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) ->
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut removed_keys = Vec::new();
|
||||
let mut tasks = Vec::new();
|
||||
for (key, installation) in &state.installations {
|
||||
if options
|
||||
.group
|
||||
@@ -246,46 +256,14 @@ pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) ->
|
||||
{
|
||||
continue;
|
||||
}
|
||||
crate::logln!(
|
||||
" {} {} {}",
|
||||
style(if options.dry_run {
|
||||
"would uninstall"
|
||||
} else {
|
||||
"uninstall"
|
||||
})
|
||||
.red()
|
||||
.bold(),
|
||||
style(&installation.repo).cyan(),
|
||||
style(format!("from {}", installation.endpoint.label())).dim()
|
||||
);
|
||||
if options.dry_run {
|
||||
continue;
|
||||
}
|
||||
let Some(site) = config.site(&installation.endpoint.site) else {
|
||||
crate::logln!(
|
||||
" {} {} {}",
|
||||
style("skip").yellow().bold(),
|
||||
style(&installation.repo).cyan(),
|
||||
style(format!("unknown site {}", installation.endpoint.site)).dim()
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let client = ProviderClient::new(site)?;
|
||||
client
|
||||
.uninstall_webhook(
|
||||
&installation.endpoint,
|
||||
&installation.repo,
|
||||
&installation.url,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to uninstall webhook for {} from {}",
|
||||
installation.repo,
|
||||
installation.endpoint.label()
|
||||
)
|
||||
})?;
|
||||
removed_keys.push(key.clone());
|
||||
tasks.push(WebhookUninstallTask {
|
||||
key: key.clone(),
|
||||
site: config.site(&installation.endpoint.site).cloned(),
|
||||
installation: installation.clone(),
|
||||
dry_run: options.dry_run,
|
||||
});
|
||||
}
|
||||
let removed_keys = run_uninstall_tasks(tasks, options.jobs)?;
|
||||
|
||||
if !options.dry_run {
|
||||
for key in removed_keys {
|
||||
@@ -301,6 +279,7 @@ pub fn ensure_configured_webhooks(
|
||||
mirror: &MirrorConfig,
|
||||
repos: &[EndpointRepo],
|
||||
work_dir: &Path,
|
||||
jobs: usize,
|
||||
) -> Result<()> {
|
||||
let Some(webhook) = &config.webhook else {
|
||||
return Ok(());
|
||||
@@ -308,26 +287,30 @@ pub fn ensure_configured_webhooks(
|
||||
if !webhook.install {
|
||||
return Ok(());
|
||||
}
|
||||
if jobs == 0 {
|
||||
bail!("--jobs must be at least 1");
|
||||
}
|
||||
let secret = webhook.secret()?;
|
||||
let mut state = load_webhook_state(work_dir)?;
|
||||
let state = Arc::new(Mutex::new(load_webhook_state(work_dir)?));
|
||||
let mut tasks = Vec::new();
|
||||
for endpoint_repo in repos {
|
||||
let Some(site) = config.site(&endpoint_repo.endpoint.site) else {
|
||||
continue;
|
||||
};
|
||||
let client = ProviderClient::new(site)?;
|
||||
install_repo_webhook(
|
||||
&WebhookInstallRequest {
|
||||
client: &client,
|
||||
group: &mirror.name,
|
||||
endpoint: &endpoint_repo.endpoint,
|
||||
repo: &endpoint_repo.repo,
|
||||
url: &webhook.url,
|
||||
secret: &secret,
|
||||
dry_run: false,
|
||||
},
|
||||
&mut state,
|
||||
)?;
|
||||
tasks.push(WebhookInstallTask {
|
||||
site: site.clone(),
|
||||
group: mirror.name.clone(),
|
||||
endpoint: endpoint_repo.endpoint.clone(),
|
||||
repo: endpoint_repo.repo.clone(),
|
||||
url: webhook.url.clone(),
|
||||
secret: secret.clone(),
|
||||
dry_run: false,
|
||||
});
|
||||
}
|
||||
run_install_tasks(tasks, jobs, Arc::clone(&state))?;
|
||||
let state = state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
save_webhook_state(work_dir, &state)
|
||||
}
|
||||
|
||||
@@ -405,6 +388,8 @@ fn worker_loop(
|
||||
struct WebhookState {
|
||||
#[serde(default)]
|
||||
installations: BTreeMap<String, WebhookInstallation>,
|
||||
#[serde(default)]
|
||||
skipped: BTreeMap<String, SkippedWebhookInstallation>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
@@ -415,65 +400,306 @@ struct WebhookInstallation {
|
||||
url: String,
|
||||
}
|
||||
|
||||
struct WebhookInstallRequest<'a> {
|
||||
client: &'a ProviderClient<'a>,
|
||||
group: &'a str,
|
||||
endpoint: &'a EndpointConfig,
|
||||
repo: &'a RemoteRepo,
|
||||
url: &'a str,
|
||||
secret: &'a str,
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
struct SkippedWebhookInstallation {
|
||||
group: String,
|
||||
endpoint: EndpointConfig,
|
||||
repo: String,
|
||||
url: String,
|
||||
reason: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WebhookInstallTask {
|
||||
site: crate::config::SiteConfig,
|
||||
group: String,
|
||||
endpoint: EndpointConfig,
|
||||
repo: RemoteRepo,
|
||||
url: String,
|
||||
secret: String,
|
||||
dry_run: bool,
|
||||
}
|
||||
|
||||
fn install_repo_webhook(
|
||||
request: &WebhookInstallRequest<'_>,
|
||||
state: &mut WebhookState,
|
||||
#[derive(Clone, Debug)]
|
||||
struct WebhookUninstallTask {
|
||||
key: String,
|
||||
site: Option<crate::config::SiteConfig>,
|
||||
installation: WebhookInstallation,
|
||||
dry_run: bool,
|
||||
}
|
||||
|
||||
fn run_install_tasks(
|
||||
tasks: Vec<WebhookInstallTask>,
|
||||
jobs: usize,
|
||||
state: Arc<Mutex<WebhookState>>,
|
||||
) -> Result<()> {
|
||||
let key = webhook_installation_key(request.group, request.endpoint, &request.repo.name);
|
||||
if state
|
||||
.installations
|
||||
.get(&key)
|
||||
.is_some_and(|installation| installation.url == request.url)
|
||||
{
|
||||
if tasks.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let worker_count = jobs.min(tasks.len());
|
||||
let (task_sender, task_receiver) = mpsc::channel();
|
||||
let (result_sender, result_receiver) = mpsc::channel();
|
||||
for task in tasks {
|
||||
task_sender.send(task)?;
|
||||
}
|
||||
drop(task_sender);
|
||||
let task_receiver = Arc::new(Mutex::new(task_receiver));
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..worker_count {
|
||||
let task_receiver = Arc::clone(&task_receiver);
|
||||
let result_sender = result_sender.clone();
|
||||
let state = Arc::clone(&state);
|
||||
handles.push(thread::spawn(move || {
|
||||
loop {
|
||||
let task = {
|
||||
let task_receiver = task_receiver
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
task_receiver.recv()
|
||||
};
|
||||
let Ok(task) = task else {
|
||||
break;
|
||||
};
|
||||
if result_sender
|
||||
.send(install_webhook_task(task, &state))
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
drop(result_sender);
|
||||
|
||||
let mut failures = Vec::new();
|
||||
for result in result_receiver {
|
||||
if let Err(error) = result {
|
||||
crate::logln!(" {} {error:#}", style("fail").red().bold());
|
||||
failures.push(error);
|
||||
}
|
||||
}
|
||||
for handle in handles {
|
||||
let _ = handle.join();
|
||||
}
|
||||
if !failures.is_empty() {
|
||||
bail!(
|
||||
"webhook installation completed with {} failure(s)",
|
||||
failures.len()
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_uninstall_tasks(tasks: Vec<WebhookUninstallTask>, jobs: usize) -> Result<Vec<String>> {
|
||||
if tasks.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let worker_count = jobs.min(tasks.len());
|
||||
let (task_sender, task_receiver) = mpsc::channel();
|
||||
let (result_sender, result_receiver) = mpsc::channel();
|
||||
for task in tasks {
|
||||
task_sender.send(task)?;
|
||||
}
|
||||
drop(task_sender);
|
||||
let task_receiver = Arc::new(Mutex::new(task_receiver));
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..worker_count {
|
||||
let task_receiver = Arc::clone(&task_receiver);
|
||||
let result_sender = result_sender.clone();
|
||||
handles.push(thread::spawn(move || {
|
||||
loop {
|
||||
let task = {
|
||||
let task_receiver = task_receiver
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
task_receiver.recv()
|
||||
};
|
||||
let Ok(task) = task else {
|
||||
break;
|
||||
};
|
||||
if result_sender.send(uninstall_webhook_task(task)).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
drop(result_sender);
|
||||
|
||||
let mut removed_keys = Vec::new();
|
||||
let mut failures = Vec::new();
|
||||
for result in result_receiver {
|
||||
match result {
|
||||
Ok(Some(key)) => removed_keys.push(key),
|
||||
Ok(None) => {}
|
||||
Err(error) => {
|
||||
crate::logln!(" {} {error:#}", style("fail").red().bold());
|
||||
failures.push(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
for handle in handles {
|
||||
let _ = handle.join();
|
||||
}
|
||||
if !failures.is_empty() {
|
||||
bail!(
|
||||
"webhook uninstall completed with {} failure(s)",
|
||||
failures.len()
|
||||
);
|
||||
}
|
||||
Ok(removed_keys)
|
||||
}
|
||||
|
||||
fn install_webhook_task(task: WebhookInstallTask, state: &Arc<Mutex<WebhookState>>) -> Result<()> {
|
||||
let key = webhook_installation_key(&task.group, &task.endpoint, &task.repo.name);
|
||||
{
|
||||
let state = state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
if state
|
||||
.installations
|
||||
.get(&key)
|
||||
.is_some_and(|installation| installation.url == task.url)
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
if state
|
||||
.skipped
|
||||
.get(&key)
|
||||
.is_some_and(|skipped| skipped.url == task.url)
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
crate::logln!(
|
||||
" {} {} {}",
|
||||
style(if request.dry_run {
|
||||
style(if task.dry_run {
|
||||
"would install"
|
||||
} else {
|
||||
"install"
|
||||
})
|
||||
.green()
|
||||
.bold(),
|
||||
style(&request.repo.name).cyan(),
|
||||
style(format!("webhook on {}", request.endpoint.label())).dim()
|
||||
style(&task.repo.name).cyan(),
|
||||
style(format!("webhook on {}", task.endpoint.label())).dim()
|
||||
);
|
||||
if request.dry_run {
|
||||
if task.dry_run {
|
||||
return Ok(());
|
||||
}
|
||||
request
|
||||
.client
|
||||
.install_webhook(request.endpoint, request.repo, request.url, request.secret)
|
||||
.with_context(|| {
|
||||
let client = ProviderClient::new(&task.site)?;
|
||||
if let Err(error) = client.install_webhook(&task.endpoint, &task.repo, &task.url, &task.secret)
|
||||
{
|
||||
if let Some(reason) = non_actionable_webhook_failure_reason(&error) {
|
||||
crate::logln!(
|
||||
" {} {} {}",
|
||||
style("skip").yellow().bold(),
|
||||
style(&task.repo.name).cyan(),
|
||||
style(format!("webhook on {}: {reason}", task.endpoint.label())).dim()
|
||||
);
|
||||
let mut state = state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
state.skipped.insert(
|
||||
key,
|
||||
SkippedWebhookInstallation {
|
||||
group: task.group,
|
||||
endpoint: task.endpoint,
|
||||
repo: task.repo.name,
|
||||
url: task.url,
|
||||
reason,
|
||||
},
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
return Err(error).with_context(|| {
|
||||
format!(
|
||||
"failed to install webhook for {} on {}",
|
||||
request.repo.name,
|
||||
request.endpoint.label()
|
||||
task.repo.name,
|
||||
task.endpoint.label()
|
||||
)
|
||||
})?;
|
||||
});
|
||||
}
|
||||
let mut state = state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
state.skipped.remove(&key);
|
||||
state.installations.insert(
|
||||
key,
|
||||
WebhookInstallation {
|
||||
group: request.group.to_string(),
|
||||
endpoint: request.endpoint.clone(),
|
||||
repo: request.repo.name.clone(),
|
||||
url: request.url.to_string(),
|
||||
group: task.group,
|
||||
endpoint: task.endpoint,
|
||||
repo: task.repo.name,
|
||||
url: task.url,
|
||||
},
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn uninstall_webhook_task(task: WebhookUninstallTask) -> Result<Option<String>> {
|
||||
crate::logln!(
|
||||
" {} {} {}",
|
||||
style(if task.dry_run {
|
||||
"would uninstall"
|
||||
} else {
|
||||
"uninstall"
|
||||
})
|
||||
.red()
|
||||
.bold(),
|
||||
style(&task.installation.repo).cyan(),
|
||||
style(format!("from {}", task.installation.endpoint.label())).dim()
|
||||
);
|
||||
if task.dry_run {
|
||||
return Ok(None);
|
||||
}
|
||||
let Some(site) = task.site else {
|
||||
crate::logln!(
|
||||
" {} {} {}",
|
||||
style("skip").yellow().bold(),
|
||||
style(&task.installation.repo).cyan(),
|
||||
style(format!("unknown site {}", task.installation.endpoint.site)).dim()
|
||||
);
|
||||
return Ok(None);
|
||||
};
|
||||
let client = ProviderClient::new(&site)?;
|
||||
client
|
||||
.uninstall_webhook(
|
||||
&task.installation.endpoint,
|
||||
&task.installation.repo,
|
||||
&task.installation.url,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to uninstall webhook for {} from {}",
|
||||
task.installation.repo,
|
||||
task.installation.endpoint.label()
|
||||
)
|
||||
})?;
|
||||
Ok(Some(task.key))
|
||||
}
|
||||
|
||||
fn non_actionable_webhook_failure_reason(error: &anyhow::Error) -> Option<String> {
|
||||
let text = error
|
||||
.chain()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
.to_ascii_lowercase();
|
||||
if text.contains("repository access blocked")
|
||||
|| text.contains("access to this repository has been disabled")
|
||||
|| text.contains("repository has been disabled")
|
||||
|| text.contains("disabled by github staff")
|
||||
|| text.contains("github.com/tos")
|
||||
{
|
||||
return Some("provider blocked access".to_string());
|
||||
}
|
||||
if text.contains("repository was archived")
|
||||
|| text.contains("archived so is read-only")
|
||||
|| text.contains("repository is archived")
|
||||
{
|
||||
return Some("repository is archived/read-only".to_string());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn webhook_installation_key(group: &str, endpoint: &EndpointConfig, repo: &str) -> String {
|
||||
format!(
|
||||
"{}\t{}\t{:?}\t{}\t{}",
|
||||
|
||||
Reference in New Issue
Block a user