Compare commits

...

2 Commits

Author SHA1 Message Date
azalea 0b9516d252 [F] Fix logging 2026-05-10 10:45:33 +00:00
azalea 2295ebfca6 [F] Fix log sync 2026-05-10 06:14:32 +00:00
4 changed files with 126 additions and 198 deletions
+99 -174
View File
@@ -1,40 +1,26 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::io::{self, IsTerminal, Write}; use std::io::{self, Write};
use std::sync::{Mutex, OnceLock}; use std::sync::{Arc, Mutex, OnceLock};
use console::style; static OUTPUT: OnceLock<Mutex<()>> = OnceLock::new();
static OUTPUT: OnceLock<Mutex<OutputState>> = OnceLock::new();
thread_local! { thread_local! {
static REPO_LOG: RefCell<Option<RepoLog>> = const { RefCell::new(None) }; static REPO_LOG: RefCell<Option<ActiveRepoLog>> = const { RefCell::new(None) };
} }
#[derive(Default)] #[derive(Clone)]
struct OutputState { pub(crate) struct RepoLogContext {
status: Option<StatusState>, inner: Arc<RepoLog>,
} }
struct StatusState { struct ActiveRepoLog {
slots: Vec<Option<String>>, context: RepoLogContext,
visible: bool, owner: bool,
interactive: bool,
} }
struct RepoLog { struct RepoLog {
repo_name: String, lines: Mutex<Vec<String>>,
slot: usize,
width: usize,
lines: Vec<String>,
}
pub struct StatusGuard;
impl Drop for StatusGuard {
fn drop(&mut self) {
finish_status_area();
}
} }
pub struct RepoLogGuard; pub struct RepoLogGuard;
@@ -45,180 +31,119 @@ impl Drop for RepoLogGuard {
} }
} }
pub fn start_status_area(slots: usize) -> StatusGuard { pub fn start_repo_log() -> RepoLogGuard {
with_output(|output| { let context = RepoLogContext {
if let Some(status) = output.status.as_mut() { inner: Arc::new(RepoLog {
clear_status(status); lines: Mutex::new(Vec::new()),
} }),
output.status = Some(StatusState { };
slots: vec![None; slots],
visible: false,
interactive: io::stdout().is_terminal() && slots > 0,
});
if let Some(status) = output.status.as_mut() {
draw_status(status);
}
});
StatusGuard
}
pub fn start_repo_log(repo_name: String, slot: usize, width: usize) -> RepoLogGuard {
REPO_LOG.with(|repo_log| { REPO_LOG.with(|repo_log| {
*repo_log.borrow_mut() = Some(RepoLog { *repo_log.borrow_mut() = Some(ActiveRepoLog {
repo_name, context,
slot, owner: true,
width,
lines: Vec::new(),
}); });
}); });
RepoLogGuard RepoLogGuard
} }
pub fn finish_repo_log() { pub(crate) fn current_repo_log_context() -> Option<RepoLogContext> {
let repo_log = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take()); REPO_LOG.with(|repo_log| {
let Some(repo_log) = repo_log else { repo_log
return; .borrow()
}; .as_ref()
.map(|repo_log| repo_log.context.clone())
with_output(|output| { })
if let Some(status) = output.status.as_mut() {
clear_status(status);
if repo_log.slot < status.slots.len() {
status.slots[repo_log.slot] = None;
}
}
for line in repo_log.lines {
println!("{line}");
}
if let Some(status) = output.status.as_mut() {
draw_status(status);
}
});
} }
pub fn repo_prefix(repo_name: &str, width: usize) -> String { pub(crate) fn inherit_repo_log(context: Option<RepoLogContext>) -> InheritedRepoLogGuard {
let mut prefix = repo_name.chars().take(width).collect::<String>(); let previous = REPO_LOG.with(|repo_log| {
if repo_name.chars().count() > width && width > 0 { let mut repo_log = repo_log.borrow_mut();
prefix.pop(); let previous = repo_log.take();
prefix.push('~'); if let Some(context) = context {
*repo_log = Some(ActiveRepoLog {
context,
owner: false,
});
}
previous
});
InheritedRepoLogGuard { previous }
}
pub(crate) struct InheritedRepoLogGuard {
previous: Option<ActiveRepoLog>,
}
impl Drop for InheritedRepoLogGuard {
fn drop(&mut self) {
REPO_LOG.with(|repo_log| {
*repo_log.borrow_mut() = self.previous.take();
});
} }
format!("{prefix:<width$}") }
pub fn finish_repo_log() {
let active = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take());
let Some(active) = active else {
return;
};
if !active.owner {
return;
}
let repo_log = active.context.inner;
let lines = {
let mut lines = repo_log
.lines
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
std::mem::take(&mut *lines)
};
with_output(|| {
for line in lines {
println!("{line}");
}
});
} }
pub fn line(args: fmt::Arguments<'_>) { pub fn line(args: fmt::Arguments<'_>) {
let text = args.to_string(); let text = args.to_string();
let captured = REPO_LOG.with(|repo_log| { let context = current_repo_log_context();
let mut repo_log = repo_log.borrow_mut(); if let Some(context) = context {
let Some(repo_log) = repo_log.as_mut() else { capture_repo_line(&context, &text);
return false;
};
if text.is_empty() {
repo_log.lines.push(String::new());
return true;
}
for line in text.lines() {
repo_log.lines.push(line.to_string());
if !line.trim().is_empty() {
update_status(repo_log, line.trim());
}
}
true
});
if captured {
return; return;
} }
with_output(|output| { with_output(|| {
if let Some(status) = output.status.as_mut() {
clear_status(status);
}
println!("{text}"); println!("{text}");
if let Some(status) = output.status.as_mut() {
draw_status(status);
}
}); });
} }
fn update_status(repo_log: &RepoLog, line: &str) { fn capture_repo_line(context: &RepoLogContext, text: &str) {
let repo = repo_prefix(&repo_log.repo_name, repo_log.width); let mut lines = context
let line = truncate_status(line, 96); .inner
with_output(|output| { .lines
let Some(status) = output.status.as_mut() else {
return;
};
if repo_log.slot >= status.slots.len() {
return;
}
clear_status(status);
status.slots[repo_log.slot] = Some(format!(
"{} {} {}",
style(format!("worker {}", repo_log.slot + 1)).dim(),
style(repo).cyan().bold(),
line
));
draw_status(status);
});
}
fn truncate_status(value: &str, max_chars: usize) -> String {
if value.chars().count() <= max_chars {
return value.to_string();
}
let mut output = value.chars().take(max_chars).collect::<String>();
output.pop();
output.push('~');
output
}
fn finish_status_area() {
with_output(|output| {
if let Some(status) = output.status.as_mut() {
clear_status(status);
}
output.status = None;
});
}
fn with_output(action: impl FnOnce(&mut OutputState)) {
let output = OUTPUT.get_or_init(|| Mutex::new(OutputState::default()));
let mut output = output
.lock() .lock()
.unwrap_or_else(|poisoned| poisoned.into_inner()); .unwrap_or_else(|poisoned| poisoned.into_inner());
action(&mut output); if text.is_empty() {
lines.push(String::new());
return;
}
for line in text.lines() {
lines.push(line.to_string());
}
}
fn with_output(action: impl FnOnce()) {
let output = OUTPUT.get_or_init(|| Mutex::new(()));
let _output = output
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
action();
let _ = io::stdout().flush(); let _ = io::stdout().flush();
} }
fn clear_status(status: &mut StatusState) {
if !status.interactive || !status.visible {
return;
}
let lines = status.slots.len();
print!("\x1b[{lines}A\r");
for _ in 0..lines {
println!("\x1b[2K");
}
print!("\x1b[{lines}A\r");
status.visible = false;
}
fn draw_status(status: &mut StatusState) {
if !status.interactive {
return;
}
for slot in &status.slots {
match slot {
Some(line) => println!("{line}"),
None => println!("{}", style("idle").dim()),
}
}
status.visible = true;
}
#[macro_export] #[macro_export]
macro_rules! logln { macro_rules! logln {
() => { () => {
+3
View File
@@ -20,13 +20,16 @@ where
let worker_count = jobs.min(items.len()); let worker_count = jobs.min(items.len());
let queue = Arc::new(Mutex::new(VecDeque::from(items))); let queue = Arc::new(Mutex::new(VecDeque::from(items)));
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
let repo_log_context = crate::logging::current_repo_log_context();
thread::scope(|scope| { thread::scope(|scope| {
for _ in 0..worker_count { for _ in 0..worker_count {
let queue = Arc::clone(&queue); let queue = Arc::clone(&queue);
let sender = sender.clone(); let sender = sender.clone();
let f = &f; let f = &f;
let repo_log_context = repo_log_context.clone();
scope.spawn(move || { scope.spawn(move || {
let _repo_log_guard = crate::logging::inherit_repo_log(repo_log_context);
while let Some(item) = pop_item(&queue) { while let Some(item) = pop_item(&queue) {
if sender.send(f(item)).is_err() { if sender.send(f(item)).is_err() {
break; break;
+3 -16
View File
@@ -237,7 +237,6 @@ fn sync_group(
); );
} }
let repo_log_width = repo_log_width(&repo_names);
let repo_jobs = repo_names let repo_jobs = repo_names
.into_iter() .into_iter()
.map(|repo_name| { .map(|repo_name| {
@@ -260,11 +259,10 @@ fn sync_group(
let base_ref_state = context.ref_state.clone(); let base_ref_state = context.ref_state.clone();
let queue = Arc::new(Mutex::new(repo_jobs)); let queue = Arc::new(Mutex::new(repo_jobs));
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
let use_status_area = worker_count > 1; let use_repo_logs = worker_count > 1;
let jobs = context.options.jobs; let jobs = context.options.jobs;
let _status_guard = use_status_area.then(|| logging::start_status_area(worker_count));
let failures = thread::scope(|scope| { let failures = thread::scope(|scope| {
for worker_id in 0..worker_count { for _ in 0..worker_count {
let queue = Arc::clone(&queue); let queue = Arc::clone(&queue);
let sender = sender.clone(); let sender = sender.clone();
let redactor = context.redactor.clone(); let redactor = context.redactor.clone();
@@ -275,9 +273,7 @@ fn sync_group(
scope.spawn(move || { scope.spawn(move || {
while let Some(mut job) = pop_repo_job(&queue) { while let Some(mut job) = pop_repo_job(&queue) {
let _repo_log_guard = use_status_area.then(|| { let _repo_log_guard = use_repo_logs.then(logging::start_repo_log);
logging::start_repo_log(job.repo_name.clone(), worker_id, repo_log_width)
});
let repo_context = RepoSyncContext { let repo_context = RepoSyncContext {
config, config,
mirror, mirror,
@@ -383,15 +379,6 @@ fn pop_repo_job(queue: &Arc<Mutex<VecDeque<RepoSyncJob>>>) -> Option<RepoSyncJob
.pop_front() .pop_front()
} }
fn repo_log_width(repo_names: &BTreeSet<String>) -> usize {
repo_names
.iter()
.map(|name| name.chars().count())
.max()
.unwrap_or(4)
.clamp(4, 32)
}
struct RepoSyncJob { struct RepoSyncJob {
repo_name: String, repo_name: String,
existing: Vec<EndpointRepo>, existing: Vec<EndpointRepo>,
+21 -8
View File
@@ -1,13 +1,26 @@
use super::*; use super::*;
#[test] #[test]
fn repo_prefix_pads_and_truncates_to_fixed_width() { fn repo_log_context_is_inherited_by_parallel_workers() {
assert_eq!(repo_prefix("api", 6), "api "); let _guard = start_repo_log();
assert_eq!(repo_prefix("very-long-repo", 8), "very-lo~");
}
#[test] crate::logln!("outer line");
fn status_text_truncates_to_fixed_width() { crate::parallel::map(vec!["worker line"], 1, |line| {
assert_eq!(truncate_status("short", 8), "short"); crate::logln!("{line}");
assert_eq!(truncate_status("very-long-status", 8), "very-lo~"); Ok::<_, anyhow::Error>(())
})
.unwrap();
let lines = {
let context = current_repo_log_context().unwrap();
context
.inner
.lines
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.clone()
};
finish_repo_log();
assert_eq!(lines, vec!["outer line", "worker line"]);
} }