From c013ce1858f678d53452266521c45050b4dd7be8 Mon Sep 17 00:00:00 2001 From: Azalea Date: Thu, 7 May 2026 04:55:49 +0000 Subject: [PATCH] [+] Webhook mode --- Cargo.lock | 114 ++++++ Cargo.toml | 3 + README.md | 121 +++--- src/config.rs | 103 +++-- src/interactive.rs | 208 +++++++++- src/main.rs | 546 ++++++++++---------------- src/provider.rs | 367 +++++++++++++++++ src/sync.rs | 55 ++- src/webhook.rs | 955 +++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 2011 insertions(+), 461 deletions(-) create mode 100644 src/webhook.rs diff --git a/Cargo.lock b/Cargo.lock index 3e072b1..8dc0cec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,12 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -85,6 +91,15 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -119,6 +134,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + [[package]] name = "clap" version = "4.6.1" @@ -177,6 +198,25 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "dialoguer" version = "0.12.0" @@ -189,6 +229,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "directories" version = "5.0.1" @@ -319,6 +370,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -368,11 +429,14 @@ dependencies = [ "console", "dialoguer", "directories", + "hmac", "regex", "reqwest", "serde", "serde_json", + "sha2", "tempfile", + "tiny_http", "toml", "url", ] @@ -398,6 +462,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "1.4.0" @@ -437,6 +510,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.9.0" @@ -1121,6 +1200,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shell-words" version = "1.1.1" @@ -1257,6 +1347,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + [[package]] name = "tinystr" version = "0.8.3" @@ -1417,6 +1519,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -1465,6 +1573,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 72c36b2..be2992e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,13 @@ clap = { version = "4.5", features = ["derive"] } console = "0.16" dialoguer = "0.12" directories = "5.0" +hmac = "0.12" reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] } regex = "1.11" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +sha2 = "0.10" tempfile = "3.13" +tiny_http = "0.12" toml = "0.8" url = "2.5" diff --git a/README.md b/README.md index fb77d6a..e8b5c99 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # git-sync -`git-sync` mirrors repositories between Git hosting providers when you run it. It does not install daemons, webhooks, timers, or cron jobs. +`git-sync` mirrors repositories between Git hosting providers when you run it. It can run as a one-shot sync command, or as a webhook receiver that triggers one-repo syncs after push events. Supported providers: @@ -22,19 +22,13 @@ The binary will be at `target/release/git-sync`. ## Configure -Create the config file: +Run the interactive configuration wizard: ```sh -git-sync config init +git-sync config ``` -Or use the interactive wizard, which can create or update the same config file: - -```sh -git-sync config wizard -``` - -The wizard asks for profile or organization URLs, reuses existing credentials when it can, asks for a PAT only when needed, and then shows the sync group before asking whether to add another group. +The wizard creates or updates the config file. It asks for profile or organization URLs, reuses existing credentials when it can, asks for a PAT only when needed, then offers webhook setup. Webhooks are strongly recommended because they sync soon after pushes and greatly reduce the chance of divergent histories. Example wizard flow: @@ -44,6 +38,7 @@ Example wizard flow: 4. Pick the provider if the instance cannot be detected. 5. Paste a PAT if needed. 6. Optionally add a third endpoint for 3-way sync. +7. Enable webhooks and enter the public webhook URL. PAT quick setup: @@ -52,23 +47,7 @@ PAT quick setup: - Gitea: open `/user/settings/applications`, create a token with repository access, then copy it. - Forgejo: open `/user/settings/applications`, create a token with repository access, then copy it. -Add sites. Prefer `--token-env` so PATs do not live in shell history or the config file. - -```sh -git-sync config site add \ - --name github \ - --provider github \ - --base-url https://github.com \ - --token-env GITHUB_TOKEN - -git-sync config site add \ - --name gitea \ - --provider gitea \ - --base-url https://gitea.example.com \ - --token-env GITEA_TOKEN -``` - -For self-hosted providers, `--base-url` is the web root. API URLs default to: +There are no separate configuration mutation commands. If you do not want to use the wizard, edit the config TOML directly; see the example config below. For self-hosted providers, `base_url` is the web root. API URLs default to: - GitHub.com: `https://api.github.com` - GitHub Enterprise: `/api/v3` @@ -76,27 +55,7 @@ For self-hosted providers, `--base-url` is the web root. API URLs default to: - Gitea: `/api/v1` - Forgejo: `/api/v1` -Override with `--api-url` if your instance is different. - -Add one or more mirror groups. Endpoints use `SITE:KIND:NAMESPACE`, where kind is `user`, `org`, or `group` depending on the provider. - -```sh -git-sync config mirror add \ - --name personal \ - --endpoint github:user:hykilpikonna \ - --endpoint gitea:user:azalea - -git-sync config mirror add \ - --name mewolab \ - --endpoint github:org:MewoLab \ - --endpoint gitea:org:MewoLab -``` - -You can inspect the generated config with: - -```sh -git-sync config show -``` +Set `api_url` in the TOML if your instance is different. ## Sync @@ -146,6 +105,72 @@ Use cron or another scheduler for automatic execution: */15 * * * * GITHUB_TOKEN=... GITEA_TOKEN=... /path/to/git-sync sync ``` +## Webhooks + +Webhook mode reduces the window for divergent commits by syncing a repository immediately after a provider sends a push event. It is still conservative: if two endpoints receive independent commits before webhook sync catches up, the normal divergence rules still apply. + +The interactive wizard can configure webhooks for you. It asks for the public URL, checks that the URL is reachable from the current machine, creates a webhook secret, and can enable periodic full syncs while `git-sync serve` is running. + +Example config: + +```toml +[webhook] +install = true +url = "https://mirror.example.com/webhook" +secret = { value = "generated-secret" } +full_sync_interval_minutes = 60 +reachability_check_interval_minutes = 15 +``` + +Start the receiver: + +```sh +git-sync serve \ + --listen 127.0.0.1:8787 +``` + +Expose that listener with your reverse proxy or tunnel, then install repository webhooks. If `[webhook]` is configured, the URL and secret can come from config: + +```sh +git-sync webhook install +``` + +You can also pass them explicitly: + +```sh +git-sync webhook install \ + --url https://mirror.example.com/webhook \ + --secret-env GIT_SYNC_WEBHOOK_SECRET +``` + +Useful install filters: + +```sh +git-sync webhook install \ + --url https://mirror.example.com/webhook \ + --secret-env GIT_SYNC_WEBHOOK_SECRET \ + --group personal \ + --repo-pattern '^important-' +``` + +The receiver accepts `POST /` and `POST /webhook`. It verifies GitHub/Gitea HMAC SHA-256 signatures and GitLab webhook tokens, then queues `git-sync sync --group --repo-pattern '^$'` internally. Duplicate events for the same group/repo are coalesced while a job is queued or running. Sync jobs are serialized inside the receiver so the local ref and failure caches stay consistent. + +When `[webhook].install = true`, normal `git-sync sync` also checks webhook installation status and installs missing webhooks for repositories that have not been recorded yet. Installation status is stored in `webhook-state.toml` under the work directory. + +To uninstall webhooks previously installed by `git-sync`: + +```sh +git-sync webhook uninstall +``` + +Serve can also run periodic full syncs. The interval can be configured in `[webhook].full_sync_interval_minutes` or overridden at startup: + +```sh +git-sync serve --full-sync-interval-minutes 30 +``` + +If `[webhook].reachability_check_interval_minutes` is configured, `serve` periodically checks that the public webhook URL is still reachable and logs a warning when it is not. + ## Sync Semantics Each mirror group is treated as a set of equivalent namespaces. Repositories are matched by repository name across all endpoints. diff --git a/src/config.rs b/src/config.rs index 0ab30e4..c10906d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,8 @@ pub struct Config { pub sites: Vec, #[serde(default)] pub mirrors: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub webhook: Option, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -55,6 +57,18 @@ pub struct MirrorConfig { pub allow_force: bool, } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct WebhookConfig { + #[serde(default = "default_true")] + pub install: bool, + pub url: String, + pub secret: TokenConfig, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub full_sync_interval_minutes: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reachability_check_interval_minutes: Option, +} + #[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] pub struct EndpointConfig { pub site: String, @@ -127,23 +141,6 @@ impl Config { } } - pub fn remove_site(&mut self, name: &str) -> Result<()> { - if !self.sites.iter().any(|site| site.name == name) { - bail!("site '{name}' does not exist"); - } - for mirror in &self.mirrors { - if mirror - .endpoints - .iter() - .any(|endpoint| endpoint.site == name) - { - bail!("site '{name}' is still used by mirror '{}'", mirror.name); - } - } - self.sites.retain(|site| site.name != name); - Ok(()) - } - pub fn upsert_mirror(&mut self, mirror: MirrorConfig) { if let Some(existing) = self .mirrors @@ -168,12 +165,7 @@ impl Config { impl SiteConfig { pub fn token(&self) -> Result { - match &self.token { - TokenConfig::Value(value) => Ok(value.clone()), - TokenConfig::Env(name) => { - env::var(name).with_context(|| format!("environment variable {name} is not set")) - } - } + self.token.value("site token") } pub fn api_base(&self) -> String { @@ -196,6 +188,22 @@ impl SiteConfig { } } +impl WebhookConfig { + pub fn secret(&self) -> Result { + self.secret.value("webhook secret") + } +} + +impl TokenConfig { + pub fn value(&self, label: &str) -> Result { + match self { + TokenConfig::Value(value) => Ok(value.clone()), + TokenConfig::Env(name) => env::var(name) + .with_context(|| format!("environment variable {name} for {label} is not set")), + } + } +} + impl EndpointConfig { pub fn label(&self) -> String { format!("{}:{}:{:?}", self.site, self.namespace, self.kind) @@ -267,6 +275,13 @@ mod tests { fn parses_token_forms() { let config: Config = toml::from_str( r#" + [webhook] + install = true + url = "https://mirror.example.test/webhook" + secret = { env = "WEBHOOK_SECRET" } + full_sync_interval_minutes = 60 + reachability_check_interval_minutes = 15 + [[sites]] name = "github" provider = "github" @@ -294,6 +309,14 @@ mod tests { assert_eq!(config.sites.len(), 1); assert_eq!(config.mirrors[0].endpoints.len(), 2); + let webhook = config.webhook.unwrap(); + assert!(webhook.install); + assert_eq!(webhook.url, "https://mirror.example.test/webhook"); + assert_eq!( + webhook.secret, + TokenConfig::Env("WEBHOOK_SECRET".to_string()) + ); + assert_eq!(webhook.full_sync_interval_minutes, Some(60)); } #[test] @@ -311,6 +334,7 @@ mod tests { visibility: Visibility::Private, allow_force: false, }], + webhook: None, }; let err = validate_config(&config).unwrap_err().to_string(); assert!(err.contains("at least two endpoints")); @@ -335,43 +359,12 @@ mod tests { visibility: Visibility::Private, allow_force: false, }], + webhook: None, }; let err = validate_config(&config).unwrap_err().to_string(); assert!(err.contains("unknown site 'missing'")); } - #[test] - fn removing_referenced_site_is_rejected() { - let mut config = Config { - sites: vec![ - site("github", ProviderKind::Github), - site("gitea", ProviderKind::Gitea), - ], - mirrors: vec![MirrorConfig { - name: "personal".to_string(), - endpoints: vec![ - EndpointConfig { - site: "github".to_string(), - kind: NamespaceKind::User, - namespace: "alice".to_string(), - }, - EndpointConfig { - site: "gitea".to_string(), - kind: NamespaceKind::User, - namespace: "alice".to_string(), - }, - ], - create_missing: true, - visibility: Visibility::Private, - allow_force: false, - }], - }; - - let err = config.remove_site("github").unwrap_err().to_string(); - assert!(err.contains("still used by mirror 'personal'")); - assert!(config.site("github").is_some()); - } - #[test] fn api_base_defaults_match_providers() { assert_eq!( diff --git a/src/interactive.rs b/src/interactive.rs index 67283f6..3288e9f 100644 --- a/src/interactive.rs +++ b/src/interactive.rs @@ -1,6 +1,8 @@ use std::fmt::Display; +use std::fs::File; +use std::io::Read; use std::path::Path; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use console::{Term, style}; @@ -15,9 +17,10 @@ use std::io::{BufRead, Write}; use crate::config::{ Config, EndpointConfig, MirrorConfig, NamespaceKind, ProviderKind, SiteConfig, TokenConfig, - Visibility, + Visibility, WebhookConfig, }; use crate::provider::ProviderClient; +use crate::webhook::check_webhook_url_reachable; #[derive(Clone, Debug)] struct ProfileTarget { @@ -120,10 +123,89 @@ fn add_sync_group_styled(config: &mut Config, theme: &ColorfulTheme) -> Result<( visibility: Visibility::Private, allow_force: false, }); + prompt_webhook_setup_styled(config, theme)?; Ok(()) } +fn prompt_webhook_setup_styled(config: &mut Config, theme: &ColorfulTheme) -> Result<()> { + if config + .webhook + .as_ref() + .is_some_and(|webhook| webhook.install) + { + println!( + "{} {}", + style("Webhooks").green().bold(), + style("already enabled").dim() + ); + return Ok(()); + } + println!(); + println!( + "{} {}", + style("Webhooks").cyan().bold(), + style( + "strongly recommended; they sync immediately after pushes and greatly reduce conflicts" + ) + .dim() + ); + if !Confirm::with_theme(theme) + .with_prompt("Install webhooks for configured repositories?") + .default(true) + .interact()? + { + return Ok(()); + } + let url = Input::::with_theme(theme) + .with_prompt("Webhook URL reachable by GitHub/GitLab/Gitea") + .validate_with(|value: &String| validate_url(value)) + .interact_text()?; + match check_webhook_url_reachable(&url) { + Ok(()) => println!( + "{} {}", + style("reachable").green().bold(), + style(&url).cyan() + ), + Err(error) => { + println!( + "{} {}: {error:#}", + style("not reachable from here").yellow().bold(), + style(&url).cyan() + ); + if !Confirm::with_theme(theme) + .with_prompt("Save this webhook URL anyway?") + .default(false) + .interact()? + { + return Ok(()); + } + } + } + let full_sync_interval_minutes = if Confirm::with_theme(theme) + .with_prompt("Run periodic full sync while the webhook server is running?") + .default(true) + .interact()? + { + Some( + Input::::with_theme(theme) + .with_prompt("Full sync interval in minutes") + .default(60) + .interact_text()?, + ) + } else { + None + }; + config.webhook = Some(WebhookConfig { + install: true, + url, + secret: TokenConfig::Value(generate_webhook_secret()), + full_sync_interval_minutes, + reachability_check_interval_minutes: Some(15), + }); + Ok(()) +} + fn prompt_wizard_action_styled(theme: &ColorfulTheme) -> Result { let options = ["Add another sync group", "Delete an existing group", "Done"]; let index = Select::with_theme(theme) @@ -550,6 +632,56 @@ where visibility: Visibility::Private, allow_force: false, }); + prompt_webhook_setup(reader, writer, config)?; + Ok(()) +} + +#[cfg(test)] +fn prompt_webhook_setup(reader: &mut R, writer: &mut W, config: &mut Config) -> Result<()> +where + R: BufRead, + W: Write, +{ + if config + .webhook + .as_ref() + .is_some_and(|webhook| webhook.install) + { + writeln!(writer, "Webhooks already enabled.")?; + return Ok(()); + } + writeln!( + writer, + "Install webhooks? Strongly recommended because immediate sync greatly reduces conflicts." + )?; + if !prompt_bool(reader, writer, "Install webhook?", true)? { + return Ok(()); + } + let url = prompt_required(reader, writer, "Webhook URL reachable by providers")?; + if let Err(error) = validate_url(&url) { + bail!(error); + } + let full_sync_interval_minutes = if prompt_bool( + reader, + writer, + "Run periodic full sync while serve is running?", + true, + )? { + Some( + prompt_with_default(reader, writer, "Full sync interval in minutes", "60")? + .parse::() + .context("full sync interval must be a number")?, + ) + } else { + None + }; + config.webhook = Some(WebhookConfig { + install: true, + url, + secret: TokenConfig::Value("test-webhook-secret".to_string()), + full_sync_interval_minutes, + reachability_check_interval_minutes: Some(15), + }); Ok(()) } @@ -1128,6 +1260,36 @@ fn validate_required(value: &str) -> std::result::Result<(), String> { } } +fn validate_url(value: &str) -> std::result::Result<(), String> { + validate_required(value)?; + let url = Url::parse(value).map_err(|error| format!("Invalid URL: {error}"))?; + match url.scheme() { + "http" | "https" => Ok(()), + _ => Err("URL must start with http:// or https://".to_string()), + } +} + +fn generate_webhook_secret() -> String { + let mut bytes = [0_u8; 32]; + if File::open("/dev/urandom") + .and_then(|mut file| file.read_exact(&mut bytes)) + .is_err() + { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos()) + .unwrap_or_default(); + for (index, byte) in bytes.iter_mut().enumerate() { + *byte = ((nanos >> ((index % 16) * 8)) & 0xff) as u8; + } + } + let mut output = String::with_capacity(bytes.len() * 2); + for byte in bytes { + output.push_str(&format!("{byte:02x}")); + } + output +} + #[cfg(test)] mod tests { use super::*; @@ -1143,6 +1305,7 @@ mod tests { "gt-token", "", "n", + "n", "3", ] .join("\n") @@ -1198,6 +1361,7 @@ mod tests { "gt-token", "", "n", + "n", "3", ] .join("\n") @@ -1213,6 +1377,41 @@ mod tests { assert_eq!(config.sites.len(), 3); } + #[test] + fn wizard_can_enable_webhooks() { + let input = [ + "https://github.com/alice", + "gh-token", + "", + "https://gitea.example.test/alice", + "gt-token", + "", + "n", + "y", + "https://mirror.example.test/webhook", + "y", + "30", + "3", + ] + .join("\n") + + "\n"; + let mut reader = Cursor::new(input.as_bytes()); + let mut output = Vec::new(); + + let config = + run_config_wizard_with_io(Config::default(), &mut reader, &mut output).unwrap(); + + let webhook = config.webhook.unwrap(); + assert!(webhook.install); + assert_eq!(webhook.url, "https://mirror.example.test/webhook"); + assert_eq!(webhook.full_sync_interval_minutes, Some(30)); + assert_eq!(webhook.reachability_check_interval_minutes, Some(15)); + assert_eq!( + webhook.secret, + TokenConfig::Value("test-webhook-secret".to_string()) + ); + } + #[test] fn wizard_reuses_existing_credentials_for_same_instance() { let config = Config { @@ -1225,6 +1424,7 @@ mod tests { git_username: None, }], mirrors: Vec::new(), + webhook: None, }; let input = [ "https://github.com/alice", @@ -1232,6 +1432,7 @@ mod tests { "https://github.com/bob", "", "n", + "n", "3", ] .join("\n") @@ -1285,6 +1486,7 @@ mod tests { visibility: Visibility::Private, allow_force: false, }], + webhook: None, }; let mut reader = Cursor::new(b"3\n".as_slice()); let mut output = Vec::new(); @@ -1337,6 +1539,7 @@ mod tests { visibility: Visibility::Private, allow_force: false, }], + webhook: None, }; let input = ["2", "1", "3"].join("\n") + "\n"; let mut reader = Cursor::new(input.as_bytes()); @@ -1391,6 +1594,7 @@ mod tests { visibility: Visibility::Private, allow_force: false, }], + webhook: None, }; let input = ["2", "2", "3"].join("\n") + "\n"; let mut reader = Cursor::new(input.as_bytes()); diff --git a/src/main.rs b/src/main.rs index 4050efa..9c0132a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,17 +4,20 @@ mod interactive; mod logging; mod provider; mod sync; +mod webhook; +use std::env; use std::path::PathBuf; -use anyhow::{Context, Result, bail}; -use clap::{Args, Parser, Subcommand, ValueEnum}; +use anyhow::{Context, Result}; +use clap::{Args, Parser, Subcommand}; -use crate::config::{ - Config, EndpointConfig, NamespaceKind, ProviderKind, SiteConfig, TokenConfig, Visibility, - default_config_path, -}; +use crate::config::{Config, default_config_path}; use crate::sync::{DEFAULT_JOBS, SyncOptions, sync_all}; +use crate::webhook::{ + ServeOptions, WebhookInstallOptions, WebhookUninstallOptions, install_webhooks, serve, + uninstall_webhooks, +}; #[derive(Parser, Debug)] #[command(name = "git-sync")] @@ -29,71 +32,15 @@ struct Cli { #[derive(Subcommand, Debug)] enum Command { - #[command(subcommand)] - Config(ConfigCommand), + /// Run the interactive configuration wizard + Config, + /// Sync configured mirror groups once Sync(SyncCommand), -} - -#[derive(Subcommand, Debug)] -enum ConfigCommand { - Init, - Wizard, + /// Run the webhook receiver + Serve(ServeCommand), + /// Install or uninstall repository webhooks #[command(subcommand)] - Site(SiteCommand), - #[command(subcommand)] - Mirror(MirrorCommand), - Show, -} - -#[derive(Subcommand, Debug)] -enum SiteCommand { - Add(SiteAddCommand), - Remove(NameCommand), - List, -} - -#[derive(Subcommand, Debug)] -enum MirrorCommand { - Add(MirrorAddCommand), - Remove(NameCommand), - List, -} - -#[derive(Args, Debug)] -struct NameCommand { - name: String, -} - -#[derive(Args, Debug)] -struct SiteAddCommand { - #[arg(long)] - name: String, - #[arg(long)] - provider: ProviderArg, - #[arg(long, value_name = "URL")] - base_url: String, - #[arg(long, value_name = "URL")] - api_url: Option, - #[arg(long, conflicts_with = "token_env")] - token: Option, - #[arg(long, value_name = "ENV", conflicts_with = "token")] - token_env: Option, - #[arg(long)] - git_username: Option, -} - -#[derive(Args, Debug)] -struct MirrorAddCommand { - #[arg(long)] - name: String, - #[arg(long = "endpoint", required = true, action = clap::ArgAction::Append, value_name = "SITE:KIND:NAMESPACE")] - endpoints: Vec, - #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] - create_missing: bool, - #[arg(long, default_value_t = VisibilityArg::Private)] - visibility: VisibilityArg, - #[arg(long, default_value_t = false)] - allow_force: bool, + Webhook(WebhookCommand), } #[derive(Args, Debug)] @@ -116,27 +63,54 @@ struct SyncCommand { jobs: usize, } -#[derive(Clone, Debug, ValueEnum)] -enum ProviderArg { - Github, - Gitlab, - Gitea, - Forgejo, +#[derive(Args, Debug)] +struct ServeCommand { + #[arg(long, default_value = "127.0.0.1:8787", value_name = "HOST:PORT")] + listen: String, + #[arg(long, conflicts_with = "secret_env")] + secret: Option, + #[arg(long, value_name = "ENV", conflicts_with = "secret")] + secret_env: Option, + #[arg(long, default_value_t = DEFAULT_JOBS, value_name = "N")] + jobs: usize, + #[arg(long, value_name = "PATH")] + work_dir: Option, + #[arg(long, value_name = "MINUTES")] + full_sync_interval_minutes: Option, } -#[derive(Clone, Debug, ValueEnum)] -enum VisibilityArg { - Private, - Public, +#[derive(Subcommand, Debug)] +enum WebhookCommand { + Install(WebhookInstallCommand), + Uninstall(WebhookUninstallCommand), } -impl std::fmt::Display for VisibilityArg { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Private => write!(f, "private"), - Self::Public => write!(f, "public"), - } - } +#[derive(Args, Debug)] +struct WebhookInstallCommand { + #[arg(long, value_name = "URL")] + url: Option, + #[arg(long, conflicts_with = "secret_env")] + secret: Option, + #[arg(long, value_name = "ENV", conflicts_with = "secret")] + secret_env: Option, + #[arg(long, value_name = "NAME")] + group: Option, + #[arg(long, value_name = "REGEX")] + repo_pattern: Option, + #[arg(long)] + dry_run: bool, + #[arg(long, value_name = "PATH")] + work_dir: Option, +} + +#[derive(Args, Debug)] +struct WebhookUninstallCommand { + #[arg(long, value_name = "NAME")] + group: Option, + #[arg(long)] + dry_run: bool, + #[arg(long, value_name = "PATH")] + work_dir: Option, } fn main() -> Result<()> { @@ -144,7 +118,7 @@ fn main() -> Result<()> { let config_path = cli.config.unwrap_or_else(default_config_path); match cli.command { - Command::Config(command) => handle_config(command, config_path), + Command::Config => interactive::run_config_wizard(&config_path), Command::Sync(command) => { let config = Config::load(&config_path) .with_context(|| format!("failed to load config at {}", config_path.display()))?; @@ -162,160 +136,89 @@ fn main() -> Result<()> { }, ) } - } -} - -fn handle_config(command: ConfigCommand, path: PathBuf) -> Result<()> { - match command { - ConfigCommand::Init => { - if path.exists() { - bail!("config already exists at {}", path.display()); - } - let config = Config::default(); - config.save(&path)?; - println!("created {}", path.display()); - Ok(()) - } - ConfigCommand::Wizard => interactive::run_config_wizard(&path), - ConfigCommand::Site(command) => handle_site(command, path), - ConfigCommand::Mirror(command) => handle_mirror(command, path), - ConfigCommand::Show => { - let config = Config::load(&path)?; - println!("{}", toml::to_string_pretty(&config)?); - Ok(()) - } - } -} - -fn handle_site(command: SiteCommand, path: PathBuf) -> Result<()> { - let mut config = Config::load_or_default(&path)?; - match command { - SiteCommand::Add(args) => { - let token = match (args.token, args.token_env) { - (Some(value), None) => TokenConfig::Value(value), - (None, Some(env)) => TokenConfig::Env(env), - (None, None) => bail!("pass either --token or --token-env"), - (Some(_), Some(_)) => unreachable!("clap enforces token conflicts"), - }; - config.upsert_site(SiteConfig { - name: args.name, - provider: args.provider.into(), - base_url: args.base_url, - api_url: args.api_url, - token, - git_username: args.git_username, - }); - config.save(&path)?; - println!("updated {}", path.display()); - Ok(()) - } - SiteCommand::Remove(args) => { - config.remove_site(&args.name)?; - config.save(&path)?; - println!("removed site {}", args.name); - Ok(()) - } - SiteCommand::List => { - for site in &config.sites { - println!("{}\t{:?}\t{}", site.name, site.provider, site.base_url); - } - Ok(()) - } - } -} - -fn handle_mirror(command: MirrorCommand, path: PathBuf) -> Result<()> { - let mut config = Config::load_or_default(&path)?; - match command { - MirrorCommand::Add(args) => { - if args.endpoints.len() < 2 { - bail!("mirror groups need at least two --endpoint values"); - } - let endpoints = args - .endpoints - .iter() - .map(|value| parse_endpoint(value)) - .collect::>>()?; - for endpoint in &endpoints { + Command::Serve(command) => { + let config = Config::load(&config_path) + .with_context(|| format!("failed to load config at {}", config_path.display()))?; + let full_sync_interval_minutes = command.full_sync_interval_minutes.or_else(|| { config - .site(&endpoint.site) - .with_context(|| format!("unknown site '{}'", endpoint.site))?; - } - config.upsert_mirror(config::MirrorConfig { - name: args.name, - endpoints, - create_missing: args.create_missing, - visibility: args.visibility.into(), - allow_force: args.allow_force, + .webhook + .as_ref() + .and_then(|webhook| webhook.full_sync_interval_minutes) }); - config.save(&path)?; - println!("updated {}", path.display()); - Ok(()) + let reachability_url = config.webhook.as_ref().map(|webhook| webhook.url.clone()); + let reachability_check_interval_minutes = config + .webhook + .as_ref() + .and_then(|webhook| webhook.reachability_check_interval_minutes); + let secret = resolve_webhook_secret(&config, command.secret, command.secret_env)?; + serve( + config, + ServeOptions { + listen: command.listen, + secret, + workers: command.jobs, + work_dir: command.work_dir, + full_sync_interval_minutes, + reachability_url, + reachability_check_interval_minutes, + }, + ) } - MirrorCommand::Remove(args) => { - config.remove_mirror(&args.name)?; - config.save(&path)?; - println!("removed mirror {}", args.name); - Ok(()) + Command::Webhook(WebhookCommand::Install(command)) => { + let config = Config::load(&config_path) + .with_context(|| format!("failed to load config at {}", config_path.display()))?; + let secret = resolve_webhook_secret(&config, command.secret, command.secret_env)?; + let url = resolve_webhook_url(&config, command.url)?; + install_webhooks( + &config, + WebhookInstallOptions { + url, + secret, + group: command.group, + repo_pattern: command.repo_pattern, + dry_run: command.dry_run, + work_dir: command.work_dir, + }, + ) } - MirrorCommand::List => { - for mirror in &config.mirrors { - let endpoints = mirror - .endpoints - .iter() - .map(|endpoint| { - format!( - "{}:{:?}:{}", - endpoint.site, endpoint.kind, endpoint.namespace - ) - }) - .collect::>() - .join(", "); - println!("{}\t{}", mirror.name, endpoints); - } - Ok(()) + Command::Webhook(WebhookCommand::Uninstall(command)) => { + let config = Config::load(&config_path) + .with_context(|| format!("failed to load config at {}", config_path.display()))?; + uninstall_webhooks( + &config, + WebhookUninstallOptions { + group: command.group, + dry_run: command.dry_run, + work_dir: command.work_dir, + }, + ) } } } -fn parse_endpoint(value: &str) -> Result { - let parts = value.splitn(3, ':').collect::>(); - if parts.len() != 3 { - bail!("endpoint must be SITE:KIND:NAMESPACE, got '{value}'"); - } - - let kind = match parts[1].to_ascii_lowercase().as_str() { - "user" => NamespaceKind::User, - "org" | "organization" => NamespaceKind::Org, - "group" => NamespaceKind::Group, - other => bail!("unsupported namespace kind '{other}'"), - }; - - Ok(EndpointConfig { - site: parts[0].to_string(), - kind, - namespace: parts[2].to_string(), - }) -} - -impl From for ProviderKind { - fn from(value: ProviderArg) -> Self { - match value { - ProviderArg::Github => Self::Github, - ProviderArg::Gitlab => Self::Gitlab, - ProviderArg::Gitea => Self::Gitea, - ProviderArg::Forgejo => Self::Forgejo, - } +fn resolve_webhook_secret( + config: &Config, + value: Option, + env_name: Option, +) -> Result { + match (value, env_name) { + (Some(value), None) => Ok(value), + (None, Some(env_name)) => env::var(&env_name) + .with_context(|| format!("environment variable {env_name} is not set")), + (None, None) => config + .webhook + .as_ref() + .map(|webhook| webhook.secret()) + .transpose()? + .ok_or_else(|| anyhow::anyhow!("pass either --secret or --secret-env")), + (Some(_), Some(_)) => unreachable!("clap enforces secret conflicts"), } } -impl From for Visibility { - fn from(value: VisibilityArg) -> Self { - match value { - VisibilityArg::Private => Self::Private, - VisibilityArg::Public => Self::Public, - } - } +fn resolve_webhook_url(config: &Config, value: Option) -> Result { + value + .or_else(|| config.webhook.as_ref().map(|webhook| webhook.url.clone())) + .ok_or_else(|| anyhow::anyhow!("pass --url or configure [webhook].url")) } #[cfg(test)] @@ -323,42 +226,23 @@ mod tests { use super::*; #[test] - fn cli_accepts_repeated_mirror_endpoints() { - let cli = Cli::try_parse_from([ - "git-sync", - "config", - "mirror", - "add", - "--name", - "personal", - "--endpoint", - "github:user:hykilpikonna", - "--endpoint", - "gitea:user:azalea", - ]) - .unwrap(); + fn cli_config_opens_wizard() { + let cli = Cli::try_parse_from(["git-sync", "config"]).unwrap(); - let Command::Config(ConfigCommand::Mirror(MirrorCommand::Add(args))) = cli.command else { - panic!("parsed unexpected command"); - }; - assert_eq!(args.name, "personal"); - assert_eq!( - args.endpoints, - vec![ - "github:user:hykilpikonna".to_string(), - "gitea:user:azalea".to_string() - ] - ); + assert!(matches!(cli.command, Command::Config)); } #[test] - fn cli_accepts_config_wizard() { - let cli = Cli::try_parse_from(["git-sync", "config", "wizard"]).unwrap(); - - assert!(matches!( - cli.command, - Command::Config(ConfigCommand::Wizard) - )); + fn cli_rejects_removed_config_subcommands() { + for args in [ + ["git-sync", "config", "wizard"].as_slice(), + ["git-sync", "config", "init"].as_slice(), + ["git-sync", "config", "show"].as_slice(), + ["git-sync", "config", "site", "list"].as_slice(), + ["git-sync", "config", "mirror", "list"].as_slice(), + ] { + assert!(Cli::try_parse_from(args).is_err()); + } } #[test] @@ -400,89 +284,75 @@ mod tests { } #[test] - fn cli_accepts_new_provider_kinds() { - for (name, expected) in [("forgejo", ProviderKind::Forgejo)] { - let cli = Cli::try_parse_from([ - "git-sync", - "config", - "site", - "add", - "--name", - name, - "--provider", - name, - "--base-url", - "https://example.test", - "--token", - "token", - ]) - .unwrap(); - - let Command::Config(ConfigCommand::Site(SiteCommand::Add(args))) = cli.command else { - panic!("parsed unexpected command"); - }; - assert_eq!(ProviderKind::from(args.provider), expected); - } - } - - #[test] - fn endpoint_parser_supports_aliases_and_rejects_bad_kinds() { - let endpoint = parse_endpoint("github:organization:MewoLab").unwrap(); - assert_eq!(endpoint.site, "github"); - assert_eq!(endpoint.kind, NamespaceKind::Org); - assert_eq!(endpoint.namespace, "MewoLab"); - - let endpoint = parse_endpoint("gitlab:group:parent/child").unwrap(); - assert_eq!(endpoint.kind, NamespaceKind::Group); - - let err = parse_endpoint("github:team:alice").unwrap_err().to_string(); - assert!(err.contains("unsupported namespace kind 'team'")); - - let err = parse_endpoint("github:user").unwrap_err().to_string(); - assert!(err.contains("SITE:KIND:NAMESPACE")); - } - - #[test] - fn site_add_requires_one_token_source() { - let missing = Cli::try_parse_from([ + fn cli_accepts_webhook_serve() { + let cli = Cli::try_parse_from([ "git-sync", - "config", - "site", - "add", - "--name", - "github", - "--provider", - "github", - "--base-url", - "https://github.com", + "serve", + "--listen", + "127.0.0.1:9000", + "--secret-env", + "WEBHOOK_SECRET", + "--jobs", + "2", + "--full-sync-interval-minutes", + "30", ]) .unwrap(); - let Command::Config(ConfigCommand::Site(SiteCommand::Add(args))) = missing.command else { + let Command::Serve(args) = cli.command else { panic!("parsed unexpected command"); }; - let temp = tempfile::TempDir::new().unwrap(); - let err = handle_site(SiteCommand::Add(args), temp.path().join("config.toml")) - .unwrap_err() - .to_string(); - assert!(err.contains("pass either --token or --token-env")); + assert_eq!(args.listen, "127.0.0.1:9000"); + assert_eq!(args.secret_env, Some("WEBHOOK_SECRET".to_string())); + assert_eq!(args.jobs, 2); + assert_eq!(args.full_sync_interval_minutes, Some(30)); + } - let conflict = Cli::try_parse_from([ + #[test] + fn cli_accepts_webhook_install() { + let cli = Cli::try_parse_from([ "git-sync", - "config", - "site", - "add", - "--name", - "github", - "--provider", - "github", - "--base-url", - "https://github.com", - "--token", - "a", - "--token-env", - "GITHUB_TOKEN", - ]); - assert!(conflict.is_err()); + "webhook", + "install", + "--url", + "https://mirror.example.test/webhook", + "--secret", + "secret", + "--group", + "sync-1", + "--repo-pattern", + "^repo$", + ]) + .unwrap(); + + let Command::Webhook(WebhookCommand::Install(args)) = cli.command else { + panic!("parsed unexpected command"); + }; + assert_eq!( + args.url, + Some("https://mirror.example.test/webhook".to_string()) + ); + assert_eq!(args.secret, Some("secret".to_string())); + assert_eq!(args.group, Some("sync-1".to_string())); + assert_eq!(args.repo_pattern, Some("^repo$".to_string())); + } + + #[test] + fn cli_accepts_webhook_uninstall() { + let cli = Cli::try_parse_from([ + "git-sync", + "webhook", + "uninstall", + "--group", + "sync-1", + "--dry-run", + ]) + .unwrap(); + + let Command::Webhook(WebhookCommand::Uninstall(args)) = cli.command else { + panic!("parsed unexpected command"); + }; + assert_eq!(args.group, Some("sync-1".to_string())); + assert!(args.dry_run); } } diff --git a/src/provider.rs b/src/provider.rs index 17fc29f..3077b4c 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -69,6 +69,37 @@ impl<'a> ProviderClient<'a> { } } + pub fn install_webhook( + &self, + endpoint: &EndpointConfig, + repo: &RemoteRepo, + url: &str, + secret: &str, + ) -> Result<()> { + match self.site.provider { + ProviderKind::Github => self.github_install_webhook(endpoint, repo, url, secret), + ProviderKind::Gitlab => self.gitlab_install_webhook(endpoint, repo, url, secret), + ProviderKind::Gitea | ProviderKind::Forgejo => { + self.gitea_install_webhook(endpoint, repo, url, secret) + } + } + } + + pub fn uninstall_webhook( + &self, + endpoint: &EndpointConfig, + repo_name: &str, + url: &str, + ) -> Result { + match self.site.provider { + ProviderKind::Github => self.github_uninstall_webhook(endpoint, repo_name, url), + ProviderKind::Gitlab => self.gitlab_uninstall_webhook(endpoint, repo_name, url), + ProviderKind::Gitea | ProviderKind::Forgejo => { + self.gitea_uninstall_webhook(endpoint, repo_name, url) + } + } + } + pub fn validate_token(&self) -> Result<()> { let url = format!("{}/user", self.site.api_base()); self.get(&url).map(|_| ()) @@ -169,6 +200,60 @@ impl<'a> ProviderClient<'a> { }) } + fn github_install_webhook( + &self, + endpoint: &EndpointConfig, + repo: &RemoteRepo, + url: &str, + secret: &str, + ) -> Result<()> { + if matches!(endpoint.kind, NamespaceKind::Group) { + bail!("GitHub endpoints use kind 'user' or 'org'"); + } + let hooks_url = format!( + "{}/repos/{}/{}/hooks", + self.site.api_base(), + endpoint.namespace, + repo.name + ); + let body = json!({ + "name": "web", + "active": true, + "events": ["push"], + "config": { + "url": url, + "content_type": "json", + "secret": secret, + "insecure_ssl": "0", + }, + }); + if let Some(hook) = self.find_existing_hook(&hooks_url, url)? { + let update_url = format!("{hooks_url}/{}", hook.id); + self.patch_json::(&update_url, &body)?; + } else { + self.post_json::(&hooks_url, &body)?; + } + Ok(()) + } + + fn github_uninstall_webhook( + &self, + endpoint: &EndpointConfig, + repo_name: &str, + url: &str, + ) -> Result { + if matches!(endpoint.kind, NamespaceKind::Group) { + bail!("GitHub endpoints use kind 'user' or 'org'"); + } + let hooks_url = format!( + "{}/repos/{}/{}/hooks", + self.site.api_base(), + endpoint.namespace, + repo_name + ); + self.delete_matching_hook(&hooks_url, url) + } + fn gitlab_list_repos(&self, endpoint: &EndpointConfig) -> Result> { match endpoint.kind { NamespaceKind::User => { @@ -247,6 +332,50 @@ impl<'a> ProviderClient<'a> { .then_some(NamespaceKind::User)) } + fn gitlab_install_webhook( + &self, + endpoint: &EndpointConfig, + repo: &RemoteRepo, + url: &str, + secret: &str, + ) -> Result<()> { + let project = format!("{}/{}", endpoint.namespace, repo.name); + let hooks_url = format!( + "{}/projects/{}/hooks", + self.site.api_base(), + urlencoding(&project) + ); + let body = json!({ + "url": url, + "push_events": true, + "tag_push_events": true, + "token": secret, + "enable_ssl_verification": true, + }); + if let Some(hook) = self.find_existing_hook(&hooks_url, url)? { + let update_url = format!("{hooks_url}/{}", hook.id); + self.put_json::(&update_url, &body)?; + } else { + self.post_json::(&hooks_url, &body)?; + } + Ok(()) + } + + fn gitlab_uninstall_webhook( + &self, + endpoint: &EndpointConfig, + repo_name: &str, + url: &str, + ) -> Result { + let project = format!("{}/{}", endpoint.namespace, repo_name); + let hooks_url = format!( + "{}/projects/{}/hooks", + self.site.api_base(), + urlencoding(&project) + ); + self.delete_matching_hook(&hooks_url, url) + } + fn gitea_list_repos(&self, endpoint: &EndpointConfig) -> Result> { match endpoint.kind { NamespaceKind::User => { @@ -310,6 +439,75 @@ impl<'a> ProviderClient<'a> { Ok(None) } + fn gitea_install_webhook( + &self, + endpoint: &EndpointConfig, + repo: &RemoteRepo, + url: &str, + secret: &str, + ) -> Result<()> { + if matches!(endpoint.kind, NamespaceKind::Group) { + bail!("Gitea endpoints use kind 'user' or 'org'"); + } + let hooks_url = format!( + "{}/repos/{}/{}/hooks", + self.site.api_base(), + endpoint.namespace, + repo.name + ); + let body = json!({ + "type": "gitea", + "active": true, + "events": ["push"], + "config": { + "url": url, + "content_type": "json", + "secret": secret, + }, + }); + if let Some(hook) = self.find_existing_hook(&hooks_url, url)? { + let update_url = format!("{hooks_url}/{}", hook.id); + self.patch_json::(&update_url, &body)?; + } else { + self.post_json::(&hooks_url, &body)?; + } + Ok(()) + } + + fn gitea_uninstall_webhook( + &self, + endpoint: &EndpointConfig, + repo_name: &str, + url: &str, + ) -> Result { + if matches!(endpoint.kind, NamespaceKind::Group) { + bail!("Gitea endpoints use kind 'user' or 'org'"); + } + let hooks_url = format!( + "{}/repos/{}/{}/hooks", + self.site.api_base(), + endpoint.namespace, + repo_name + ); + self.delete_matching_hook(&hooks_url, url) + } + + fn find_existing_hook(&self, hooks_url: &str, target_url: &str) -> Result> { + let hooks: Vec = self.paged_get(hooks_url)?; + Ok(hooks + .into_iter() + .find(|hook| hook.url() == Some(target_url))) + } + + fn delete_matching_hook(&self, hooks_url: &str, target_url: &str) -> Result { + let Some(hook) = self.find_existing_hook(hooks_url, target_url)? else { + return Ok(false); + }; + let delete_url = format!("{hooks_url}/{}", hook.id); + self.delete(&delete_url)?; + Ok(true) + } + fn paged_get(&self, first_url: &str) -> Result> where T: for<'de> Deserialize<'de>, @@ -351,6 +549,32 @@ impl<'a> ProviderClient<'a> { .with_context(|| format!("invalid JSON from {url}")) } + fn put_json(&self, url: &str, body: &serde_json::Value) -> Result + where + T: for<'de> Deserialize<'de>, + { + self.request_headers(self.http.put(url))? + .json(body) + .send() + .with_context(|| format!("PUT {url} failed")) + .and_then(|response| check_response("PUT", url, response))? + .json() + .with_context(|| format!("invalid JSON from {url}")) + } + + fn patch_json(&self, url: &str, body: &serde_json::Value) -> Result + where + T: for<'de> Deserialize<'de>, + { + self.request_headers(self.http.patch(url))? + .json(body) + .send() + .with_context(|| format!("PATCH {url} failed")) + .and_then(|response| check_response("PATCH", url, response))? + .json() + .with_context(|| format!("invalid JSON from {url}")) + } + fn get(&self, url: &str) -> Result { self.request_headers(self.http.get(url))? .send() @@ -358,6 +582,13 @@ impl<'a> ProviderClient<'a> { .and_then(|response| check_response("GET", url, response)) } + fn delete(&self, url: &str) -> Result { + self.request_headers(self.http.delete(url))? + .send() + .with_context(|| format!("DELETE {url} failed")) + .and_then(|response| check_response("DELETE", url, response)) + } + fn request_headers( &self, request: reqwest::blocking::RequestBuilder, @@ -500,6 +731,23 @@ impl From for RemoteRepo { } } +#[derive(Deserialize)] +struct RepoHook { + id: u64, + #[serde(default)] + url: Option, + #[serde(default)] + config: HashMap, +} + +impl RepoHook { + fn url(&self) -> Option<&str> { + self.url + .as_deref() + .or_else(|| self.config.get("url").map(String::as_str)) + } +} + pub fn repos_by_name(repos: Vec) -> HashMap> { let mut output: HashMap> = HashMap::new(); for repo in repos { @@ -677,6 +925,97 @@ mod tests { handle.join().unwrap(); } + #[test] + fn install_webhook_posts_github_hook_when_missing() { + let (api_url, handle) = request_server( + vec![("200 OK", "[]"), ("201 Created", r#"{"id":1}"#)], + |index, request| match index { + 0 => assert!( + request.starts_with("GET /repos/alice/repo/hooks "), + "request was {request}" + ), + 1 => { + assert!( + request.starts_with("POST /repos/alice/repo/hooks "), + "request was {request}" + ); + assert!(request.contains("https://mirror.example.test/webhook")); + assert!(request.contains("secret")); + assert!(request.contains("push")); + } + _ => unreachable!(), + }, + ); + let site = SiteConfig { + api_url: Some(api_url), + ..site(ProviderKind::Github, None) + }; + let client = ProviderClient::new(&site).unwrap(); + + client + .install_webhook( + &EndpointConfig { + site: "github".to_string(), + kind: NamespaceKind::User, + namespace: "alice".to_string(), + }, + &RemoteRepo { + name: "repo".to_string(), + clone_url: "https://github.com/alice/repo.git".to_string(), + private: true, + description: None, + }, + "https://mirror.example.test/webhook", + "secret", + ) + .unwrap(); + handle.join().unwrap(); + } + + #[test] + fn uninstall_webhook_deletes_matching_github_hook() { + let (api_url, handle) = request_server( + vec![ + ( + "200 OK", + r#"[{"id":42,"config":{"url":"https://mirror.example.test/webhook"}}]"#, + ), + ("204 No Content", ""), + ], + |index, request| match index { + 0 => assert!( + request.starts_with("GET /repos/alice/repo/hooks "), + "request was {request}" + ), + 1 => assert!( + request.starts_with("DELETE /repos/alice/repo/hooks/42 "), + "request was {request}" + ), + _ => unreachable!(), + }, + ); + let site = SiteConfig { + api_url: Some(api_url), + ..site(ProviderKind::Github, None) + }; + let client = ProviderClient::new(&site).unwrap(); + + let removed = client + .uninstall_webhook( + &EndpointConfig { + site: "github".to_string(), + kind: NamespaceKind::User, + namespace: "alice".to_string(), + }, + "repo", + "https://mirror.example.test/webhook", + ) + .unwrap(); + + assert!(removed); + handle.join().unwrap(); + } + fn site(provider: ProviderKind, git_username: Option) -> SiteConfig { SiteConfig { name: "site".to_string(), @@ -714,4 +1053,32 @@ mod tests { }); (format!("http://{address}"), handle) } + + fn request_server( + responses: Vec<(&'static str, &'static str)>, + mut assert_request: F, + ) -> (String, thread::JoinHandle<()>) + where + F: FnMut(usize, &str) + Send + 'static, + { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap(); + let handle = thread::spawn(move || { + for (index, (status, body)) in responses.into_iter().enumerate() { + let (mut stream, _) = listener.accept().unwrap(); + let mut buffer = [0_u8; 4096]; + let bytes = stream.read(&mut buffer).unwrap(); + let request = String::from_utf8_lossy(&buffer[..bytes]).to_string(); + assert_request(index, &request); + + write!( + stream, + "HTTP/1.1 {status}\r\ncontent-type: application/json\r\nconnection: close\r\ncontent-length: {}\r\n\r\n{body}", + body.len() + ) + .unwrap(); + } + }); + (format!("http://{address}"), handle) + } } diff --git a/src/sync.rs b/src/sync.rs index a3c94bd..d1e1e62 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -16,6 +16,7 @@ use crate::git::{ }; use crate::logging; use crate::provider::{EndpointRepo, ProviderClient, repos_by_name}; +use crate::webhook; const FAILURE_STATE_FILE: &str = "failed-repos.toml"; const REF_STATE_FILE: &str = "ref-state.toml"; @@ -361,24 +362,14 @@ fn sync_group( .unwrap_or(mirror.create_missing); let allow_force = context.options.force_override.unwrap_or(mirror.allow_force); - let mut all_endpoint_repos = Vec::new(); - for endpoint in &mirror.endpoints { - let site = context.config.site(&endpoint.site).unwrap(); - let client = ProviderClient::new(site)?; - crate::logln!( - " {} {}", - style("list").cyan().bold(), - style(endpoint.label()).dim() - ); - let repos = client - .list_repos(endpoint) - .with_context(|| format!("failed to list repos for {}", endpoint.label()))?; - for repo in repos { - all_endpoint_repos.push(EndpointRepo { - endpoint: endpoint.clone(), - repo, - }); - } + let all_endpoint_repos = list_group_repos(context.config, mirror)?; + if !context.options.dry_run { + webhook::ensure_configured_webhooks( + context.config, + mirror, + &all_endpoint_repos, + context.work_dir, + )?; } let mut repos = repos_by_name(all_endpoint_repos); @@ -538,9 +529,37 @@ fn sync_group( failures }); + if create_missing && !context.options.dry_run { + let repos = list_group_repos(context.config, mirror)?; + webhook::ensure_configured_webhooks(context.config, mirror, &repos, context.work_dir)?; + } + Ok(failures) } +fn list_group_repos(config: &Config, mirror: &MirrorConfig) -> Result> { + let mut all_endpoint_repos = Vec::new(); + for endpoint in &mirror.endpoints { + let site = config.site(&endpoint.site).unwrap(); + let client = ProviderClient::new(site)?; + crate::logln!( + " {} {}", + style("list").cyan().bold(), + style(endpoint.label()).dim() + ); + let repos = client + .list_repos(endpoint) + .with_context(|| format!("failed to list repos for {}", endpoint.label()))?; + for repo in repos { + all_endpoint_repos.push(EndpointRepo { + endpoint: endpoint.clone(), + repo, + }); + } + } + Ok(all_endpoint_repos) +} + fn pop_repo_job(queue: &Arc>>) -> Option { queue .lock() diff --git a/src/webhook.rs b/src/webhook.rs new file mode 100644 index 0000000..3753958 --- /dev/null +++ b/src/webhook.rs @@ -0,0 +1,955 @@ +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; +use std::time::Duration; + +use anyhow::{Context, Result, bail}; +use console::style; +use hmac::{Hmac, Mac}; +use regex::escape; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sha2::Sha256; +use tiny_http::{Header, Method, Request, Response, Server, StatusCode}; + +use crate::config::{ + Config, EndpointConfig, MirrorConfig, ProviderKind, default_work_dir, validate_config, +}; +use crate::provider::{EndpointRepo, ProviderClient, RemoteRepo}; +use crate::sync::{SyncOptions, sync_all}; + +type HmacSha256 = Hmac; +const WEBHOOK_STATE_FILE: &str = "webhook-state.toml"; + +#[derive(Clone, Debug)] +pub struct ServeOptions { + pub listen: String, + pub secret: String, + pub workers: usize, + pub work_dir: Option, + pub full_sync_interval_minutes: Option, + pub reachability_url: Option, + pub reachability_check_interval_minutes: Option, +} + +#[derive(Clone, Debug)] +pub struct WebhookInstallOptions { + pub url: String, + pub secret: String, + pub group: Option, + pub repo_pattern: Option, + pub dry_run: bool, + pub work_dir: Option, +} + +#[derive(Clone, Debug)] +pub struct WebhookUninstallOptions { + pub group: Option, + pub dry_run: bool, + pub work_dir: Option, +} + +#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] +struct WebhookJob { + group: String, + repo: String, +} + +#[derive(Clone)] +struct JobQueue { + sender: mpsc::Sender, + pending: Arc>>, +} + +pub fn serve(config: Config, options: ServeOptions) -> Result<()> { + validate_config(&config)?; + if options.workers == 0 { + bail!("--jobs must be at least 1"); + } + let server = Server::http(&options.listen) + .map_err(|error| anyhow::anyhow!("failed to listen on {}: {error}", options.listen))?; + crate::logln!( + "{} {}", + style("Webhook server").cyan().bold(), + style(&options.listen).bold() + ); + + let config = Arc::new(config); + let (sender, receiver) = mpsc::channel::(); + let pending = Arc::new(Mutex::new(BTreeSet::::new())); + let receiver = Arc::new(Mutex::new(receiver)); + let sync_lock = Arc::new(Mutex::new(())); + for worker_id in 0..options.workers { + let receiver = Arc::clone(&receiver); + let pending = Arc::clone(&pending); + let config = Arc::clone(&config); + let sync_lock = Arc::clone(&sync_lock); + let work_dir = options.work_dir.clone(); + thread::spawn(move || { + worker_loop(worker_id, receiver, pending, sync_lock, config, work_dir) + }); + } + + if let Some(minutes) = options + .full_sync_interval_minutes + .filter(|minutes| *minutes > 0) + { + let config = Arc::clone(&config); + let sync_lock = Arc::clone(&sync_lock); + let work_dir = options.work_dir.clone(); + thread::spawn(move || full_sync_timer_loop(config, sync_lock, work_dir, minutes)); + } + if let Some(url) = options.reachability_url.clone() { + let minutes = options + .reachability_check_interval_minutes + .filter(|minutes| *minutes > 0) + .unwrap_or(15); + thread::spawn(move || reachability_timer_loop(url, minutes)); + } + + let queue = JobQueue { sender, pending }; + for request in server.incoming_requests() { + let response = handle_request(request, &config, &options.secret, &queue); + if let Err(error) = response { + crate::logln!("{} {error:#}", style("webhook error").red().bold()); + } + } + Ok(()) +} + +fn full_sync_timer_loop( + config: Arc, + sync_lock: Arc>, + work_dir: Option, + minutes: u64, +) { + loop { + thread::sleep(Duration::from_secs(minutes * 60)); + crate::logln!( + "{} {}", + style("full sync timer").cyan().bold(), + style(format!("every {minutes} minute(s)")).dim() + ); + let _sync_guard = sync_lock + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if let Err(error) = sync_all( + &config, + SyncOptions { + work_dir: work_dir.clone(), + ..SyncOptions::default() + }, + ) { + crate::logln!("{} {error:#}", style("full sync failed").red().bold()); + } + } +} + +fn reachability_timer_loop(url: String, minutes: u64) { + loop { + thread::sleep(Duration::from_secs(minutes * 60)); + if let Err(error) = check_webhook_url_reachable(&url) { + crate::logln!( + "{} {}: {error:#}", + style("webhook URL unreachable").yellow().bold(), + style(&url).cyan() + ); + } + } +} + +pub fn install_webhooks(config: &Config, options: WebhookInstallOptions) -> Result<()> { + validate_config(config)?; + let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir); + let mut state = load_webhook_state(&work_dir)?; + let repo_pattern = options + .repo_pattern + .as_deref() + .map(regex::Regex::new) + .transpose() + .with_context(|| "invalid --repo-pattern regex")?; + + for mirror in &config.mirrors { + if options + .group + .as_ref() + .is_some_and(|group| group != &mirror.name) + { + continue; + } + crate::logln!(); + crate::logln!( + "{} {}", + style("Webhook group").cyan().bold(), + style(&mirror.name).bold() + ); + for endpoint in &mirror.endpoints { + let site = config.site(&endpoint.site).unwrap(); + let client = ProviderClient::new(site)?; + crate::logln!( + " {} {}", + style("list").cyan().bold(), + style(endpoint.label()).dim() + ); + let repos = client + .list_repos(endpoint) + .with_context(|| format!("failed to list repos for {}", endpoint.label()))?; + for repo in repos { + if repo_pattern + .as_ref() + .is_some_and(|pattern| !pattern.is_match(&repo.name)) + { + continue; + } + install_repo_webhook( + &WebhookInstallRequest { + client: &client, + group: &mirror.name, + endpoint, + repo: &repo, + url: &options.url, + secret: &options.secret, + dry_run: options.dry_run, + }, + &mut state, + )?; + } + } + } + if !options.dry_run { + save_webhook_state(&work_dir, &state)?; + } + Ok(()) +} + +pub fn uninstall_webhooks(config: &Config, options: WebhookUninstallOptions) -> Result<()> { + validate_config(config)?; + let work_dir = options.work_dir.clone().unwrap_or_else(default_work_dir); + let mut state = load_webhook_state(&work_dir)?; + if state.installations.is_empty() { + crate::logln!( + "{} no webhook installations recorded", + style("skip").yellow().bold() + ); + return Ok(()); + } + + let mut removed_keys = Vec::new(); + for (key, installation) in &state.installations { + if options + .group + .as_ref() + .is_some_and(|group| group != &installation.group) + { + continue; + } + crate::logln!( + " {} {} {}", + style(if options.dry_run { + "would uninstall" + } else { + "uninstall" + }) + .red() + .bold(), + style(&installation.repo).cyan(), + style(format!("from {}", installation.endpoint.label())).dim() + ); + if options.dry_run { + continue; + } + let Some(site) = config.site(&installation.endpoint.site) else { + crate::logln!( + " {} {} {}", + style("skip").yellow().bold(), + style(&installation.repo).cyan(), + style(format!("unknown site {}", installation.endpoint.site)).dim() + ); + continue; + }; + let client = ProviderClient::new(site)?; + client + .uninstall_webhook( + &installation.endpoint, + &installation.repo, + &installation.url, + ) + .with_context(|| { + format!( + "failed to uninstall webhook for {} from {}", + installation.repo, + installation.endpoint.label() + ) + })?; + removed_keys.push(key.clone()); + } + + if !options.dry_run { + for key in removed_keys { + state.installations.remove(&key); + } + save_webhook_state(&work_dir, &state)?; + } + Ok(()) +} + +pub fn ensure_configured_webhooks( + config: &Config, + mirror: &MirrorConfig, + repos: &[EndpointRepo], + work_dir: &Path, +) -> Result<()> { + let Some(webhook) = &config.webhook else { + return Ok(()); + }; + if !webhook.install { + return Ok(()); + } + let secret = webhook.secret()?; + let mut state = load_webhook_state(work_dir)?; + for endpoint_repo in repos { + let Some(site) = config.site(&endpoint_repo.endpoint.site) else { + continue; + }; + let client = ProviderClient::new(site)?; + install_repo_webhook( + &WebhookInstallRequest { + client: &client, + group: &mirror.name, + endpoint: &endpoint_repo.endpoint, + repo: &endpoint_repo.repo, + url: &webhook.url, + secret: &secret, + dry_run: false, + }, + &mut state, + )?; + } + save_webhook_state(work_dir, &state) +} + +pub fn check_webhook_url_reachable(url: &str) -> Result<()> { + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(10)) + .build()?; + client + .get(url) + .send() + .with_context(|| format!("failed to reach {url}"))?; + Ok(()) +} + +fn worker_loop( + worker_id: usize, + receiver: Arc>>, + pending: Arc>>, + sync_lock: Arc>, + config: Arc, + work_dir: Option, +) { + loop { + let job = { + let receiver = receiver + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + receiver.recv() + }; + let Ok(job) = job else { + return; + }; + + crate::logln!( + "{} {} {}", + style(format!("worker {worker_id}")).cyan().bold(), + style(&job.group).bold(), + style(&job.repo).cyan() + ); + let _sync_guard = sync_lock + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + let result = sync_all( + &config, + SyncOptions { + group: Some(job.group.clone()), + repo_pattern: Some(format!("^{}$", escape(&job.repo))), + work_dir: work_dir.clone(), + jobs: 1, + ..SyncOptions::default() + }, + ); + match result { + Ok(()) => crate::logln!( + "{} {}/{}", + style("webhook sync done").green().bold(), + job.group, + job.repo + ), + Err(error) => crate::logln!( + "{} {}/{}: {error:#}", + style("webhook sync failed").red().bold(), + job.group, + job.repo + ), + } + pending + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .remove(&job); + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct WebhookState { + #[serde(default)] + installations: BTreeMap, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct WebhookInstallation { + group: String, + endpoint: EndpointConfig, + repo: String, + url: String, +} + +struct WebhookInstallRequest<'a> { + client: &'a ProviderClient<'a>, + group: &'a str, + endpoint: &'a EndpointConfig, + repo: &'a RemoteRepo, + url: &'a str, + secret: &'a str, + dry_run: bool, +} + +fn install_repo_webhook( + request: &WebhookInstallRequest<'_>, + state: &mut WebhookState, +) -> Result<()> { + let key = webhook_installation_key(request.group, request.endpoint, &request.repo.name); + if state + .installations + .get(&key) + .is_some_and(|installation| installation.url == request.url) + { + return Ok(()); + } + crate::logln!( + " {} {} {}", + style(if request.dry_run { + "would install" + } else { + "install" + }) + .green() + .bold(), + style(&request.repo.name).cyan(), + style(format!("webhook on {}", request.endpoint.label())).dim() + ); + if request.dry_run { + return Ok(()); + } + request + .client + .install_webhook(request.endpoint, request.repo, request.url, request.secret) + .with_context(|| { + format!( + "failed to install webhook for {} on {}", + request.repo.name, + request.endpoint.label() + ) + })?; + state.installations.insert( + key, + WebhookInstallation { + group: request.group.to_string(), + endpoint: request.endpoint.clone(), + repo: request.repo.name.clone(), + url: request.url.to_string(), + }, + ); + Ok(()) +} + +fn webhook_installation_key(group: &str, endpoint: &EndpointConfig, repo: &str) -> String { + format!( + "{}\t{}\t{:?}\t{}\t{}", + group, endpoint.site, endpoint.kind, endpoint.namespace, repo + ) +} + +fn load_webhook_state(work_dir: &Path) -> Result { + let path = webhook_state_path(work_dir); + if !path.exists() { + return Ok(WebhookState::default()); + } + let contents = + fs::read_to_string(&path).with_context(|| format!("failed to read {}", path.display()))?; + toml::from_str(&contents).with_context(|| format!("failed to parse {}", path.display())) +} + +fn save_webhook_state(work_dir: &Path, state: &WebhookState) -> Result<()> { + let path = webhook_state_path(work_dir); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .with_context(|| format!("failed to create {}", parent.display()))?; + } + let contents = toml::to_string_pretty(state)?; + fs::write(&path, contents).with_context(|| format!("failed to write {}", path.display())) +} + +fn webhook_state_path(work_dir: &Path) -> PathBuf { + work_dir.join(WEBHOOK_STATE_FILE) +} + +fn handle_request( + mut request: Request, + config: &Config, + secret: &str, + queue: &JobQueue, +) -> Result<()> { + if request.method() != &Method::Post { + respond(request, StatusCode(405), "method not allowed")?; + return Ok(()); + } + let path = request.url().split('?').next().unwrap_or(request.url()); + if path != "/" && path != "/webhook" { + respond(request, StatusCode(404), "not found")?; + return Ok(()); + } + + let headers = headers_map(request.headers()); + let mut body = Vec::new(); + request + .as_reader() + .read_to_end(&mut body) + .context("failed to read webhook request body")?; + let provider = detect_provider(&headers); + if !verify_signature(provider.as_ref(), &headers, &body, secret) { + respond(request, StatusCode(401), "invalid signature")?; + return Ok(()); + } + + let value: Value = match serde_json::from_slice(&body) { + Ok(value) => value, + Err(_) => { + respond(request, StatusCode(400), "invalid JSON")?; + return Ok(()); + } + }; + let Some(event) = parse_event(provider, &headers, &value) else { + respond(request, StatusCode(202), "ignored")?; + return Ok(()); + }; + let jobs = matching_jobs(config, &event); + if jobs.is_empty() { + respond(request, StatusCode(202), "no matching mirror group")?; + return Ok(()); + } + let mut enqueued = 0; + for job in jobs { + if enqueue(queue, job)? { + enqueued += 1; + } + } + respond(request, StatusCode(202), &format!("queued {enqueued}"))?; + Ok(()) +} + +fn enqueue(queue: &JobQueue, job: WebhookJob) -> Result { + let mut pending = queue + .pending + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if !pending.insert(job.clone()) { + return Ok(false); + } + if queue.sender.send(job.clone()).is_err() { + pending.remove(&job); + bail!("webhook worker queue is closed"); + } + Ok(true) +} + +fn respond(request: Request, status: StatusCode, body: &str) -> Result<()> { + request + .respond(Response::from_string(body.to_string()).with_status_code(status)) + .map_err(|error| anyhow::anyhow!("failed to send webhook response: {error}")) +} + +fn headers_map(headers: &[Header]) -> HashMap { + headers + .iter() + .map(|header| { + ( + header.field.to_string().to_ascii_lowercase(), + header.value.as_str().to_string(), + ) + }) + .collect() +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct WebhookEvent { + provider: Option, + repo: String, + namespace: Option, +} + +fn detect_provider(headers: &HashMap) -> Option { + if headers.contains_key("x-forgejo-event") { + Some(ProviderKind::Forgejo) + } else if headers.contains_key("x-gitea-event") { + Some(ProviderKind::Gitea) + } else if headers.contains_key("x-gitlab-event") { + Some(ProviderKind::Gitlab) + } else if headers.contains_key("x-github-event") { + Some(ProviderKind::Github) + } else { + None + } +} + +fn parse_event( + provider: Option, + headers: &HashMap, + value: &Value, +) -> Option { + if !is_push_event(headers) { + return None; + } + match provider { + Some(ProviderKind::Gitlab) => parse_gitlab_event(provider, value), + Some(ProviderKind::Github) + | Some(ProviderKind::Gitea) + | Some(ProviderKind::Forgejo) + | None => parse_github_like_event(provider, value), + } +} + +fn is_push_event(headers: &HashMap) -> bool { + let github = headers + .get("x-github-event") + .is_some_and(|event| event == "push"); + let gitea = headers + .get("x-gitea-event") + .is_some_and(|event| event == "push"); + let forgejo = headers + .get("x-forgejo-event") + .is_some_and(|event| event == "push"); + let gitlab = headers + .get("x-gitlab-event") + .is_some_and(|event| event == "Push Hook" || event == "Tag Push Hook"); + github || gitea || forgejo || gitlab +} + +fn parse_github_like_event(provider: Option, value: &Value) -> Option { + let repo = value.pointer("/repository/name")?.as_str()?.to_string(); + let namespace = value + .pointer("/repository/owner/login") + .or_else(|| value.pointer("/repository/owner/username")) + .or_else(|| value.pointer("/repository/owner/name")) + .and_then(Value::as_str) + .map(ToOwned::to_owned) + .or_else(|| { + value + .pointer("/repository/full_name") + .and_then(Value::as_str) + .and_then(|full_name| { + full_name + .rsplit_once('/') + .map(|(owner, _)| owner.to_string()) + }) + }); + Some(WebhookEvent { + provider, + repo, + namespace, + }) +} + +fn parse_gitlab_event(provider: Option, value: &Value) -> Option { + let path = value.pointer("/project/path")?.as_str()?.to_string(); + let namespace = value + .pointer("/project/path_with_namespace") + .and_then(Value::as_str) + .and_then(|path| { + path.rsplit_once('/') + .map(|(namespace, _)| namespace.to_string()) + }) + .or_else(|| { + value + .pointer("/project/namespace") + .and_then(Value::as_str) + .map(ToOwned::to_owned) + }); + Some(WebhookEvent { + provider, + repo: path, + namespace, + }) +} + +fn matching_jobs(config: &Config, event: &WebhookEvent) -> Vec { + config + .mirrors + .iter() + .filter(|mirror| { + mirror.endpoints.iter().any(|endpoint| { + let Some(site) = config.site(&endpoint.site) else { + return false; + }; + event + .provider + .as_ref() + .is_none_or(|provider| &site.provider == provider) + && event + .namespace + .as_ref() + .is_none_or(|namespace| namespace == &endpoint.namespace) + }) + }) + .map(|mirror| WebhookJob { + group: mirror.name.clone(), + repo: event.repo.clone(), + }) + .collect() +} + +fn verify_signature( + provider: Option<&ProviderKind>, + headers: &HashMap, + body: &[u8], + secret: &str, +) -> bool { + match provider { + Some(ProviderKind::Gitlab) => headers + .get("x-gitlab-token") + .is_some_and(|token| fixed_time_eq(token.as_bytes(), secret.as_bytes())), + Some(ProviderKind::Github) => { + verify_hmac_header(headers, "x-hub-signature-256", body, secret) + } + Some(ProviderKind::Gitea) | Some(ProviderKind::Forgejo) => { + verify_hmac_header(headers, "x-gitea-signature", body, secret) + || verify_hmac_header(headers, "x-forgejo-signature", body, secret) + || verify_hmac_header(headers, "x-hub-signature-256", body, secret) + } + None => false, + } +} + +fn verify_hmac_header( + headers: &HashMap, + header: &str, + body: &[u8], + secret: &str, +) -> bool { + let Some(signature) = headers.get(header) else { + return false; + }; + let expected = hmac_sha256_hex(secret.as_bytes(), body); + let signature = signature + .trim() + .strip_prefix("sha256=") + .unwrap_or_else(|| signature.trim()); + fixed_time_eq(signature.as_bytes(), expected.as_bytes()) +} + +fn hmac_sha256_hex(secret: &[u8], body: &[u8]) -> String { + let mut mac = HmacSha256::new_from_slice(secret).expect("HMAC accepts any key length"); + mac.update(body); + let bytes = mac.finalize().into_bytes(); + let mut output = String::with_capacity(bytes.len() * 2); + for byte in bytes { + output.push_str(&format!("{byte:02x}")); + } + output +} + +fn fixed_time_eq(left: &[u8], right: &[u8]) -> bool { + if left.len() != right.len() { + return false; + } + let mut diff = 0_u8; + for (left, right) in left.iter().zip(right) { + diff |= left ^ right; + } + diff == 0 +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{ + EndpointConfig, MirrorConfig, NamespaceKind, SiteConfig, TokenConfig, Visibility, + }; + + #[test] + fn verifies_github_hmac_signature() { + let body = br#"{"repository":{"name":"repo"}}"#; + let mut headers = HashMap::new(); + headers.insert("x-github-event".to_string(), "push".to_string()); + headers.insert( + "x-hub-signature-256".to_string(), + format!("sha256={}", hmac_sha256_hex(b"secret", body)), + ); + + assert!(verify_signature( + Some(&ProviderKind::Github), + &headers, + body, + "secret" + )); + assert!(!verify_signature( + Some(&ProviderKind::Github), + &headers, + body, + "wrong" + )); + } + + #[test] + fn parses_github_push_payload() { + let mut headers = HashMap::new(); + headers.insert("x-github-event".to_string(), "push".to_string()); + let value: Value = serde_json::from_str( + r#"{"repository":{"name":"repo","full_name":"alice/repo","owner":{"login":"alice"}}}"#, + ) + .unwrap(); + + let event = parse_event(Some(ProviderKind::Github), &headers, &value).unwrap(); + + assert_eq!(event.repo, "repo"); + assert_eq!(event.namespace.as_deref(), Some("alice")); + } + + #[test] + fn parses_forgejo_push_payload() { + let mut headers = HashMap::new(); + headers.insert("x-forgejo-event".to_string(), "push".to_string()); + let value: Value = serde_json::from_str( + r#"{"repository":{"name":"repo","full_name":"azalea/repo","owner":{"username":"azalea"}}}"#, + ) + .unwrap(); + + let provider = detect_provider(&headers); + let event = parse_event(provider.clone(), &headers, &value).unwrap(); + + assert_eq!(provider, Some(ProviderKind::Forgejo)); + assert_eq!(event.repo, "repo"); + assert_eq!(event.namespace.as_deref(), Some("azalea")); + } + + #[test] + fn verifies_forgejo_hmac_signature() { + let body = br#"{"repository":{"name":"repo"}}"#; + let mut headers = HashMap::new(); + headers.insert("x-forgejo-event".to_string(), "push".to_string()); + headers.insert( + "x-forgejo-signature".to_string(), + format!("sha256={}", hmac_sha256_hex(b"secret", body)), + ); + + assert!(verify_signature( + Some(&ProviderKind::Forgejo), + &headers, + body, + "secret" + )); + } + + #[test] + fn parses_gitlab_push_payload() { + let mut headers = HashMap::new(); + headers.insert("x-gitlab-event".to_string(), "Push Hook".to_string()); + let value: Value = serde_json::from_str( + r#"{"project":{"path":"repo","path_with_namespace":"parent/alice/repo"}}"#, + ) + .unwrap(); + + let event = parse_event(Some(ProviderKind::Gitlab), &headers, &value).unwrap(); + + assert_eq!(event.repo, "repo"); + assert_eq!(event.namespace.as_deref(), Some("parent/alice")); + } + + #[test] + fn matches_jobs_by_provider_and_namespace() { + let config = Config { + sites: vec![ + site("github", ProviderKind::Github), + site("gitea", ProviderKind::Gitea), + ], + mirrors: vec![MirrorConfig { + name: "sync-1".to_string(), + endpoints: vec![ + endpoint("github", NamespaceKind::User, "alice"), + endpoint("gitea", NamespaceKind::User, "azalea"), + ], + create_missing: true, + visibility: Visibility::Private, + allow_force: false, + }], + webhook: None, + }; + let event = WebhookEvent { + provider: Some(ProviderKind::Github), + repo: "repo".to_string(), + namespace: Some("alice".to_string()), + }; + + let jobs = matching_jobs(&config, &event); + + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].group, "sync-1"); + assert_eq!(jobs[0].repo, "repo"); + } + + #[test] + fn webhook_state_persists_installations() { + let temp = tempfile::TempDir::new().unwrap(); + let endpoint = endpoint("github", NamespaceKind::User, "alice"); + let key = webhook_installation_key("sync-1", &endpoint, "repo"); + let mut state = WebhookState::default(); + state.installations.insert( + key.clone(), + WebhookInstallation { + group: "sync-1".to_string(), + endpoint, + repo: "repo".to_string(), + url: "https://mirror.example.test/webhook".to_string(), + }, + ); + + save_webhook_state(temp.path(), &state).unwrap(); + let loaded = load_webhook_state(temp.path()).unwrap(); + + assert_eq!(loaded.installations[&key].repo, "repo"); + assert_eq!( + loaded.installations[&key].url, + "https://mirror.example.test/webhook" + ); + } + + fn site(name: &str, provider: ProviderKind) -> SiteConfig { + SiteConfig { + name: name.to_string(), + provider, + base_url: "https://example.test".to_string(), + api_url: None, + token: TokenConfig::Value("secret".to_string()), + git_username: None, + } + } + + fn endpoint(site: &str, kind: NamespaceKind, namespace: &str) -> EndpointConfig { + EndpointConfig { + site: site.to_string(), + kind, + namespace: namespace.to_string(), + } + } +}