From 04d8aee687cb8d4ee1b619016a3192463f972a39 Mon Sep 17 00:00:00 2001 From: Azalea Date: Sun, 10 May 2026 02:14:53 -0400 Subject: [PATCH] [F] Fix log sync (#6) --- src/logging.rs | 133 ++++++++++++++++++++++++++++++++---------- src/parallel.rs | 3 + tests/unit/logging.rs | 25 ++++++++ 3 files changed, 130 insertions(+), 31 deletions(-) diff --git a/src/logging.rs b/src/logging.rs index 8b498fb..0d6a29e 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,14 +1,14 @@ use std::cell::RefCell; use std::fmt; use std::io::{self, IsTerminal, Write}; -use std::sync::{Mutex, OnceLock}; +use std::sync::{Arc, Mutex, OnceLock}; use console::style; static OUTPUT: OnceLock> = OnceLock::new(); thread_local! { - static REPO_LOG: RefCell> = const { RefCell::new(None) }; + static REPO_LOG: RefCell> = const { RefCell::new(None) }; } #[derive(Default)] @@ -22,11 +22,21 @@ struct StatusState { interactive: bool, } +#[derive(Clone)] +pub(crate) struct RepoLogContext { + inner: Arc, +} + +struct ActiveRepoLog { + context: RepoLogContext, + owner: bool, +} + struct RepoLog { repo_name: String, slot: usize, width: usize, - lines: Vec, + lines: Mutex>, } pub struct StatusGuard; @@ -63,22 +73,76 @@ pub fn start_status_area(slots: usize) -> StatusGuard { } pub fn start_repo_log(repo_name: String, slot: usize, width: usize) -> RepoLogGuard { - REPO_LOG.with(|repo_log| { - *repo_log.borrow_mut() = Some(RepoLog { + let context = RepoLogContext { + inner: Arc::new(RepoLog { repo_name, slot, width, - lines: Vec::new(), + lines: Mutex::new(Vec::new()), + }), + }; + REPO_LOG.with(|repo_log| { + *repo_log.borrow_mut() = Some(ActiveRepoLog { + context, + owner: true, }); }); RepoLogGuard } +pub(crate) fn current_repo_log_context() -> Option { + REPO_LOG.with(|repo_log| { + repo_log + .borrow() + .as_ref() + .map(|repo_log| repo_log.context.clone()) + }) +} + +pub(crate) fn inherit_repo_log(context: Option) -> InheritedRepoLogGuard { + let previous = REPO_LOG.with(|repo_log| { + let mut repo_log = repo_log.borrow_mut(); + let previous = repo_log.take(); + if let Some(context) = context { + *repo_log = Some(ActiveRepoLog { + context, + owner: false, + }); + } + previous + }); + InheritedRepoLogGuard { previous } +} + +pub(crate) struct InheritedRepoLogGuard { + previous: Option, +} + +impl Drop for InheritedRepoLogGuard { + fn drop(&mut self) { + REPO_LOG.with(|repo_log| { + *repo_log.borrow_mut() = self.previous.take(); + }); + } +} + pub fn finish_repo_log() { - let repo_log = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take()); - let Some(repo_log) = repo_log else { + let active = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take()); + let Some(active) = active else { return; }; + if !active.owner { + return; + } + + let repo_log = active.context.inner; + let lines = { + let mut lines = repo_log + .lines + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + std::mem::take(&mut *lines) + }; with_output(|output| { if let Some(status) = output.status.as_mut() { @@ -87,7 +151,7 @@ pub fn finish_repo_log() { status.slots[repo_log.slot] = None; } } - for line in repo_log.lines { + for line in lines { println!("{line}"); } if let Some(status) = output.status.as_mut() { @@ -107,27 +171,9 @@ pub fn repo_prefix(repo_name: &str, width: usize) -> String { pub fn line(args: fmt::Arguments<'_>) { let text = args.to_string(); - let captured = REPO_LOG.with(|repo_log| { - let mut repo_log = repo_log.borrow_mut(); - let Some(repo_log) = repo_log.as_mut() else { - return false; - }; - - if text.is_empty() { - repo_log.lines.push(String::new()); - return true; - } - - for line in text.lines() { - repo_log.lines.push(line.to_string()); - if !line.trim().is_empty() { - update_status(repo_log, line.trim()); - } - } - true - }); - - if captured { + let context = current_repo_log_context(); + if let Some(context) = context { + capture_repo_line(&context, &text); return; } @@ -142,7 +188,32 @@ pub fn line(args: fmt::Arguments<'_>) { }); } -fn update_status(repo_log: &RepoLog, line: &str) { +fn capture_repo_line(context: &RepoLogContext, text: &str) { + let mut status_updates = Vec::new(); + { + let mut lines = context + .inner + .lines + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if text.is_empty() { + lines.push(String::new()); + return; + } + for line in text.lines() { + lines.push(line.to_string()); + if !line.trim().is_empty() { + status_updates.push(line.trim().to_string()); + } + } + } + for line in status_updates { + update_status(context, &line); + } +} + +fn update_status(context: &RepoLogContext, line: &str) { + let repo_log = &context.inner; let repo = repo_prefix(&repo_log.repo_name, repo_log.width); let line = truncate_status(line, 96); with_output(|output| { diff --git a/src/parallel.rs b/src/parallel.rs index 22c53b0..f042cd2 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -20,13 +20,16 @@ where let worker_count = jobs.min(items.len()); let queue = Arc::new(Mutex::new(VecDeque::from(items))); let (sender, receiver) = mpsc::channel(); + let repo_log_context = crate::logging::current_repo_log_context(); thread::scope(|scope| { for _ in 0..worker_count { let queue = Arc::clone(&queue); let sender = sender.clone(); let f = &f; + let repo_log_context = repo_log_context.clone(); scope.spawn(move || { + let _repo_log_guard = crate::logging::inherit_repo_log(repo_log_context); while let Some(item) = pop_item(&queue) { if sender.send(f(item)).is_err() { break; diff --git a/tests/unit/logging.rs b/tests/unit/logging.rs index 5e64151..d36ee9e 100644 --- a/tests/unit/logging.rs +++ b/tests/unit/logging.rs @@ -11,3 +11,28 @@ fn status_text_truncates_to_fixed_width() { assert_eq!(truncate_status("short", 8), "short"); assert_eq!(truncate_status("very-long-status", 8), "very-lo~"); } + +#[test] +fn repo_log_context_is_inherited_by_parallel_workers() { + let _guard = start_repo_log("repo-a".to_string(), 0, 8); + + crate::logln!("outer line"); + crate::parallel::map(vec!["worker line"], 1, |line| { + crate::logln!("{line}"); + Ok::<_, anyhow::Error>(()) + }) + .unwrap(); + + let lines = { + let context = current_repo_log_context().unwrap(); + context + .inner + .lines + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .clone() + }; + finish_repo_log(); + + assert_eq!(lines, vec!["outer line", "worker line"]); +}