[F] Fix log sync
This commit is contained in:
+102
-31
@@ -1,14 +1,14 @@
|
||||
use std::cell::RefCell;
|
||||
use std::fmt;
|
||||
use std::io::{self, IsTerminal, Write};
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
use std::sync::{Arc, Mutex, OnceLock};
|
||||
|
||||
use console::style;
|
||||
|
||||
static OUTPUT: OnceLock<Mutex<OutputState>> = OnceLock::new();
|
||||
|
||||
thread_local! {
|
||||
static REPO_LOG: RefCell<Option<RepoLog>> = const { RefCell::new(None) };
|
||||
static REPO_LOG: RefCell<Option<ActiveRepoLog>> = const { RefCell::new(None) };
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -22,11 +22,21 @@ struct StatusState {
|
||||
interactive: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RepoLogContext {
|
||||
inner: Arc<RepoLog>,
|
||||
}
|
||||
|
||||
struct ActiveRepoLog {
|
||||
context: RepoLogContext,
|
||||
owner: bool,
|
||||
}
|
||||
|
||||
struct RepoLog {
|
||||
repo_name: String,
|
||||
slot: usize,
|
||||
width: usize,
|
||||
lines: Vec<String>,
|
||||
lines: Mutex<Vec<String>>,
|
||||
}
|
||||
|
||||
pub struct StatusGuard;
|
||||
@@ -63,22 +73,76 @@ pub fn start_status_area(slots: usize) -> StatusGuard {
|
||||
}
|
||||
|
||||
pub fn start_repo_log(repo_name: String, slot: usize, width: usize) -> RepoLogGuard {
|
||||
REPO_LOG.with(|repo_log| {
|
||||
*repo_log.borrow_mut() = Some(RepoLog {
|
||||
let context = RepoLogContext {
|
||||
inner: Arc::new(RepoLog {
|
||||
repo_name,
|
||||
slot,
|
||||
width,
|
||||
lines: Vec::new(),
|
||||
lines: Mutex::new(Vec::new()),
|
||||
}),
|
||||
};
|
||||
REPO_LOG.with(|repo_log| {
|
||||
*repo_log.borrow_mut() = Some(ActiveRepoLog {
|
||||
context,
|
||||
owner: true,
|
||||
});
|
||||
});
|
||||
RepoLogGuard
|
||||
}
|
||||
|
||||
pub(crate) fn current_repo_log_context() -> Option<RepoLogContext> {
|
||||
REPO_LOG.with(|repo_log| {
|
||||
repo_log
|
||||
.borrow()
|
||||
.as_ref()
|
||||
.map(|repo_log| repo_log.context.clone())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn inherit_repo_log(context: Option<RepoLogContext>) -> InheritedRepoLogGuard {
|
||||
let previous = REPO_LOG.with(|repo_log| {
|
||||
let mut repo_log = repo_log.borrow_mut();
|
||||
let previous = repo_log.take();
|
||||
if let Some(context) = context {
|
||||
*repo_log = Some(ActiveRepoLog {
|
||||
context,
|
||||
owner: false,
|
||||
});
|
||||
}
|
||||
previous
|
||||
});
|
||||
InheritedRepoLogGuard { previous }
|
||||
}
|
||||
|
||||
pub(crate) struct InheritedRepoLogGuard {
|
||||
previous: Option<ActiveRepoLog>,
|
||||
}
|
||||
|
||||
impl Drop for InheritedRepoLogGuard {
|
||||
fn drop(&mut self) {
|
||||
REPO_LOG.with(|repo_log| {
|
||||
*repo_log.borrow_mut() = self.previous.take();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish_repo_log() {
|
||||
let repo_log = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take());
|
||||
let Some(repo_log) = repo_log else {
|
||||
let active = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take());
|
||||
let Some(active) = active else {
|
||||
return;
|
||||
};
|
||||
if !active.owner {
|
||||
return;
|
||||
}
|
||||
|
||||
let repo_log = active.context.inner;
|
||||
let lines = {
|
||||
let mut lines = repo_log
|
||||
.lines
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
std::mem::take(&mut *lines)
|
||||
};
|
||||
|
||||
with_output(|output| {
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
@@ -87,7 +151,7 @@ pub fn finish_repo_log() {
|
||||
status.slots[repo_log.slot] = None;
|
||||
}
|
||||
}
|
||||
for line in repo_log.lines {
|
||||
for line in lines {
|
||||
println!("{line}");
|
||||
}
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
@@ -107,27 +171,9 @@ pub fn repo_prefix(repo_name: &str, width: usize) -> String {
|
||||
|
||||
pub fn line(args: fmt::Arguments<'_>) {
|
||||
let text = args.to_string();
|
||||
let captured = REPO_LOG.with(|repo_log| {
|
||||
let mut repo_log = repo_log.borrow_mut();
|
||||
let Some(repo_log) = repo_log.as_mut() else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if text.is_empty() {
|
||||
repo_log.lines.push(String::new());
|
||||
return true;
|
||||
}
|
||||
|
||||
for line in text.lines() {
|
||||
repo_log.lines.push(line.to_string());
|
||||
if !line.trim().is_empty() {
|
||||
update_status(repo_log, line.trim());
|
||||
}
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
if captured {
|
||||
let context = current_repo_log_context();
|
||||
if let Some(context) = context {
|
||||
capture_repo_line(&context, &text);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -142,7 +188,32 @@ pub fn line(args: fmt::Arguments<'_>) {
|
||||
});
|
||||
}
|
||||
|
||||
fn update_status(repo_log: &RepoLog, line: &str) {
|
||||
fn capture_repo_line(context: &RepoLogContext, text: &str) {
|
||||
let mut status_updates = Vec::new();
|
||||
{
|
||||
let mut lines = context
|
||||
.inner
|
||||
.lines
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
if text.is_empty() {
|
||||
lines.push(String::new());
|
||||
return;
|
||||
}
|
||||
for line in text.lines() {
|
||||
lines.push(line.to_string());
|
||||
if !line.trim().is_empty() {
|
||||
status_updates.push(line.trim().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
for line in status_updates {
|
||||
update_status(context, &line);
|
||||
}
|
||||
}
|
||||
|
||||
fn update_status(context: &RepoLogContext, line: &str) {
|
||||
let repo_log = &context.inner;
|
||||
let repo = repo_prefix(&repo_log.repo_name, repo_log.width);
|
||||
let line = truncate_status(line, 96);
|
||||
with_output(|output| {
|
||||
|
||||
@@ -20,13 +20,16 @@ where
|
||||
let worker_count = jobs.min(items.len());
|
||||
let queue = Arc::new(Mutex::new(VecDeque::from(items)));
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let repo_log_context = crate::logging::current_repo_log_context();
|
||||
|
||||
thread::scope(|scope| {
|
||||
for _ in 0..worker_count {
|
||||
let queue = Arc::clone(&queue);
|
||||
let sender = sender.clone();
|
||||
let f = &f;
|
||||
let repo_log_context = repo_log_context.clone();
|
||||
scope.spawn(move || {
|
||||
let _repo_log_guard = crate::logging::inherit_repo_log(repo_log_context);
|
||||
while let Some(item) = pop_item(&queue) {
|
||||
if sender.send(f(item)).is_err() {
|
||||
break;
|
||||
|
||||
@@ -11,3 +11,28 @@ fn status_text_truncates_to_fixed_width() {
|
||||
assert_eq!(truncate_status("short", 8), "short");
|
||||
assert_eq!(truncate_status("very-long-status", 8), "very-lo~");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn repo_log_context_is_inherited_by_parallel_workers() {
|
||||
let _guard = start_repo_log("repo-a".to_string(), 0, 8);
|
||||
|
||||
crate::logln!("outer line");
|
||||
crate::parallel::map(vec!["worker line"], 1, |line| {
|
||||
crate::logln!("{line}");
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let lines = {
|
||||
let context = current_repo_log_context().unwrap();
|
||||
context
|
||||
.inner
|
||||
.lines
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner())
|
||||
.clone()
|
||||
};
|
||||
finish_repo_log();
|
||||
|
||||
assert_eq!(lines, vec!["outer line", "worker line"]);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user