[O] Better multithreaded logging
This commit is contained in:
+191
-20
@@ -1,30 +1,99 @@
|
||||
use std::cell::RefCell;
|
||||
use std::fmt;
|
||||
use std::io::{self, IsTerminal, Write};
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
|
||||
use console::style;
|
||||
|
||||
static OUTPUT_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
||||
static OUTPUT: OnceLock<Mutex<OutputState>> = OnceLock::new();
|
||||
|
||||
thread_local! {
|
||||
static PREFIX: RefCell<Option<String>> = const { RefCell::new(None) };
|
||||
static REPO_LOG: RefCell<Option<RepoLog>> = const { RefCell::new(None) };
|
||||
}
|
||||
|
||||
pub struct PrefixGuard {
|
||||
previous: Option<String>,
|
||||
#[derive(Default)]
|
||||
struct OutputState {
|
||||
status: Option<StatusState>,
|
||||
}
|
||||
|
||||
impl Drop for PrefixGuard {
|
||||
struct StatusState {
|
||||
slots: Vec<Option<String>>,
|
||||
visible: bool,
|
||||
interactive: bool,
|
||||
}
|
||||
|
||||
struct RepoLog {
|
||||
repo_name: String,
|
||||
slot: usize,
|
||||
width: usize,
|
||||
lines: Vec<String>,
|
||||
}
|
||||
|
||||
pub struct StatusGuard;
|
||||
|
||||
impl Drop for StatusGuard {
|
||||
fn drop(&mut self) {
|
||||
PREFIX.with(|prefix| {
|
||||
*prefix.borrow_mut() = self.previous.take();
|
||||
});
|
||||
finish_status_area();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_prefix(prefix: String) -> PrefixGuard {
|
||||
let previous = PREFIX.with(|current| current.borrow_mut().replace(prefix));
|
||||
PrefixGuard { previous }
|
||||
pub struct RepoLogGuard;
|
||||
|
||||
impl Drop for RepoLogGuard {
|
||||
fn drop(&mut self) {
|
||||
finish_repo_log();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_status_area(slots: usize) -> StatusGuard {
|
||||
with_output(|output| {
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
clear_status(status);
|
||||
}
|
||||
output.status = Some(StatusState {
|
||||
slots: vec![None; slots],
|
||||
visible: false,
|
||||
interactive: io::stdout().is_terminal() && slots > 0,
|
||||
});
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
draw_status(status);
|
||||
}
|
||||
});
|
||||
StatusGuard
|
||||
}
|
||||
|
||||
pub fn start_repo_log(repo_name: String, slot: usize, width: usize) -> RepoLogGuard {
|
||||
REPO_LOG.with(|repo_log| {
|
||||
*repo_log.borrow_mut() = Some(RepoLog {
|
||||
repo_name,
|
||||
slot,
|
||||
width,
|
||||
lines: Vec::new(),
|
||||
});
|
||||
});
|
||||
RepoLogGuard
|
||||
}
|
||||
|
||||
pub fn finish_repo_log() {
|
||||
let repo_log = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take());
|
||||
let Some(repo_log) = repo_log else {
|
||||
return;
|
||||
};
|
||||
|
||||
with_output(|output| {
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
clear_status(status);
|
||||
if repo_log.slot < status.slots.len() {
|
||||
status.slots[repo_log.slot] = None;
|
||||
}
|
||||
}
|
||||
for line in repo_log.lines {
|
||||
println!("{line}");
|
||||
}
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
draw_status(status);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn repo_prefix(repo_name: &str, width: usize) -> String {
|
||||
@@ -38,20 +107,116 @@ pub fn repo_prefix(repo_name: &str, width: usize) -> String {
|
||||
|
||||
pub fn line(args: fmt::Arguments<'_>) {
|
||||
let text = args.to_string();
|
||||
let prefix = PREFIX.with(|prefix| prefix.borrow().clone());
|
||||
let lock = OUTPUT_LOCK.get_or_init(|| Mutex::new(()));
|
||||
let _guard = lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
let captured = REPO_LOG.with(|repo_log| {
|
||||
let mut repo_log = repo_log.borrow_mut();
|
||||
let Some(repo_log) = repo_log.as_mut() else {
|
||||
return false;
|
||||
};
|
||||
|
||||
match prefix {
|
||||
Some(prefix) if !text.is_empty() => {
|
||||
for line in text.lines() {
|
||||
println!("{} | {}", style(&prefix).cyan().bold(), line);
|
||||
if text.is_empty() {
|
||||
repo_log.lines.push(String::new());
|
||||
return true;
|
||||
}
|
||||
|
||||
for line in text.lines() {
|
||||
repo_log.lines.push(line.to_string());
|
||||
if !line.trim().is_empty() {
|
||||
update_status(repo_log, line.trim());
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
println!("{text}");
|
||||
true
|
||||
});
|
||||
|
||||
if captured {
|
||||
return;
|
||||
}
|
||||
|
||||
with_output(|output| {
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
clear_status(status);
|
||||
}
|
||||
println!("{text}");
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
draw_status(status);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn update_status(repo_log: &RepoLog, line: &str) {
|
||||
let repo = repo_prefix(&repo_log.repo_name, repo_log.width);
|
||||
let line = truncate_status(line, 96);
|
||||
with_output(|output| {
|
||||
let Some(status) = output.status.as_mut() else {
|
||||
return;
|
||||
};
|
||||
if repo_log.slot >= status.slots.len() {
|
||||
return;
|
||||
}
|
||||
clear_status(status);
|
||||
status.slots[repo_log.slot] = Some(format!(
|
||||
"{} {} {}",
|
||||
style(format!("worker {}", repo_log.slot + 1)).dim(),
|
||||
style(repo).cyan().bold(),
|
||||
line
|
||||
));
|
||||
draw_status(status);
|
||||
});
|
||||
}
|
||||
|
||||
fn truncate_status(value: &str, max_chars: usize) -> String {
|
||||
if value.chars().count() <= max_chars {
|
||||
return value.to_string();
|
||||
}
|
||||
let mut output = value.chars().take(max_chars).collect::<String>();
|
||||
output.pop();
|
||||
output.push('~');
|
||||
output
|
||||
}
|
||||
|
||||
fn finish_status_area() {
|
||||
with_output(|output| {
|
||||
if let Some(status) = output.status.as_mut() {
|
||||
clear_status(status);
|
||||
}
|
||||
output.status = None;
|
||||
});
|
||||
}
|
||||
|
||||
fn with_output(action: impl FnOnce(&mut OutputState)) {
|
||||
let output = OUTPUT.get_or_init(|| Mutex::new(OutputState::default()));
|
||||
let mut output = output
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
action(&mut output);
|
||||
let _ = io::stdout().flush();
|
||||
}
|
||||
|
||||
fn clear_status(status: &mut StatusState) {
|
||||
if !status.interactive || !status.visible {
|
||||
return;
|
||||
}
|
||||
|
||||
let lines = status.slots.len();
|
||||
print!("\x1b[{lines}A\r");
|
||||
for _ in 0..lines {
|
||||
println!("\x1b[2K");
|
||||
}
|
||||
print!("\x1b[{lines}A\r");
|
||||
status.visible = false;
|
||||
}
|
||||
|
||||
fn draw_status(status: &mut StatusState) {
|
||||
if !status.interactive {
|
||||
return;
|
||||
}
|
||||
|
||||
for slot in &status.slots {
|
||||
match slot {
|
||||
Some(line) => println!("{line}"),
|
||||
None => println!("{}", style("idle").dim()),
|
||||
}
|
||||
}
|
||||
status.visible = true;
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -73,4 +238,10 @@ mod tests {
|
||||
assert_eq!(repo_prefix("api", 6), "api ");
|
||||
assert_eq!(repo_prefix("very-long-repo", 8), "very-lo~");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_text_truncates_to_fixed_width() {
|
||||
assert_eq!(truncate_status("short", 8), "short");
|
||||
assert_eq!(truncate_status("very-long-status", 8), "very-lo~");
|
||||
}
|
||||
}
|
||||
|
||||
+7
-3
@@ -443,8 +443,10 @@ fn sync_group(
|
||||
let base_ref_state = context.ref_state.clone();
|
||||
let queue = Arc::new(Mutex::new(repo_jobs));
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let use_status_area = worker_count > 1;
|
||||
let _status_guard = use_status_area.then(|| logging::start_status_area(worker_count));
|
||||
let failures = thread::scope(|scope| {
|
||||
for _ in 0..worker_count {
|
||||
for worker_id in 0..worker_count {
|
||||
let queue = Arc::clone(&queue);
|
||||
let sender = sender.clone();
|
||||
let redactor = context.redactor.clone();
|
||||
@@ -455,8 +457,9 @@ fn sync_group(
|
||||
|
||||
scope.spawn(move || {
|
||||
while let Some(mut job) = pop_repo_job(&queue) {
|
||||
let prefix = logging::repo_prefix(&job.repo_name, repo_log_width);
|
||||
let _prefix_guard = logging::set_prefix(prefix);
|
||||
let _repo_log_guard = use_status_area.then(|| {
|
||||
logging::start_repo_log(job.repo_name.clone(), worker_id, repo_log_width)
|
||||
});
|
||||
let repo_context = RepoSyncContext {
|
||||
config,
|
||||
mirror,
|
||||
@@ -481,6 +484,7 @@ fn sync_group(
|
||||
repo_name: job.repo_name,
|
||||
error,
|
||||
});
|
||||
logging::finish_repo_log();
|
||||
if sender.send(result).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user