Compare commits
2 Commits
main
...
fix-log-sync
| Author | SHA1 | Date | |
|---|---|---|---|
| 81edd9b8bf | |||
| 09cede6658 |
+99
-174
@@ -1,40 +1,26 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{self, IsTerminal, Write};
|
use std::io::{self, Write};
|
||||||
use std::sync::{Mutex, OnceLock};
|
use std::sync::{Arc, Mutex, OnceLock};
|
||||||
|
|
||||||
use console::style;
|
static OUTPUT: OnceLock<Mutex<()>> = OnceLock::new();
|
||||||
|
|
||||||
static OUTPUT: OnceLock<Mutex<OutputState>> = OnceLock::new();
|
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static REPO_LOG: RefCell<Option<RepoLog>> = const { RefCell::new(None) };
|
static REPO_LOG: RefCell<Option<ActiveRepoLog>> = const { RefCell::new(None) };
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Clone)]
|
||||||
struct OutputState {
|
pub(crate) struct RepoLogContext {
|
||||||
status: Option<StatusState>,
|
inner: Arc<RepoLog>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct StatusState {
|
struct ActiveRepoLog {
|
||||||
slots: Vec<Option<String>>,
|
context: RepoLogContext,
|
||||||
visible: bool,
|
owner: bool,
|
||||||
interactive: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RepoLog {
|
struct RepoLog {
|
||||||
repo_name: String,
|
lines: Mutex<Vec<String>>,
|
||||||
slot: usize,
|
|
||||||
width: usize,
|
|
||||||
lines: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct StatusGuard;
|
|
||||||
|
|
||||||
impl Drop for StatusGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
finish_status_area();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RepoLogGuard;
|
pub struct RepoLogGuard;
|
||||||
@@ -45,180 +31,119 @@ impl Drop for RepoLogGuard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_status_area(slots: usize) -> StatusGuard {
|
pub fn start_repo_log() -> RepoLogGuard {
|
||||||
with_output(|output| {
|
let context = RepoLogContext {
|
||||||
if let Some(status) = output.status.as_mut() {
|
inner: Arc::new(RepoLog {
|
||||||
clear_status(status);
|
lines: Mutex::new(Vec::new()),
|
||||||
}
|
}),
|
||||||
output.status = Some(StatusState {
|
};
|
||||||
slots: vec![None; slots],
|
|
||||||
visible: false,
|
|
||||||
interactive: io::stdout().is_terminal() && slots > 0,
|
|
||||||
});
|
|
||||||
if let Some(status) = output.status.as_mut() {
|
|
||||||
draw_status(status);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
StatusGuard
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start_repo_log(repo_name: String, slot: usize, width: usize) -> RepoLogGuard {
|
|
||||||
REPO_LOG.with(|repo_log| {
|
REPO_LOG.with(|repo_log| {
|
||||||
*repo_log.borrow_mut() = Some(RepoLog {
|
*repo_log.borrow_mut() = Some(ActiveRepoLog {
|
||||||
repo_name,
|
context,
|
||||||
slot,
|
owner: true,
|
||||||
width,
|
|
||||||
lines: Vec::new(),
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
RepoLogGuard
|
RepoLogGuard
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn finish_repo_log() {
|
pub(crate) fn current_repo_log_context() -> Option<RepoLogContext> {
|
||||||
let repo_log = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take());
|
REPO_LOG.with(|repo_log| {
|
||||||
let Some(repo_log) = repo_log else {
|
repo_log
|
||||||
return;
|
.borrow()
|
||||||
};
|
.as_ref()
|
||||||
|
.map(|repo_log| repo_log.context.clone())
|
||||||
with_output(|output| {
|
})
|
||||||
if let Some(status) = output.status.as_mut() {
|
|
||||||
clear_status(status);
|
|
||||||
if repo_log.slot < status.slots.len() {
|
|
||||||
status.slots[repo_log.slot] = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for line in repo_log.lines {
|
|
||||||
println!("{line}");
|
|
||||||
}
|
|
||||||
if let Some(status) = output.status.as_mut() {
|
|
||||||
draw_status(status);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn repo_prefix(repo_name: &str, width: usize) -> String {
|
pub(crate) fn inherit_repo_log(context: Option<RepoLogContext>) -> InheritedRepoLogGuard {
|
||||||
let mut prefix = repo_name.chars().take(width).collect::<String>();
|
let previous = REPO_LOG.with(|repo_log| {
|
||||||
if repo_name.chars().count() > width && width > 0 {
|
let mut repo_log = repo_log.borrow_mut();
|
||||||
prefix.pop();
|
let previous = repo_log.take();
|
||||||
prefix.push('~');
|
if let Some(context) = context {
|
||||||
|
*repo_log = Some(ActiveRepoLog {
|
||||||
|
context,
|
||||||
|
owner: false,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
previous
|
||||||
|
});
|
||||||
|
InheritedRepoLogGuard { previous }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct InheritedRepoLogGuard {
|
||||||
|
previous: Option<ActiveRepoLog>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for InheritedRepoLogGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
REPO_LOG.with(|repo_log| {
|
||||||
|
*repo_log.borrow_mut() = self.previous.take();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
format!("{prefix:<width$}")
|
}
|
||||||
|
|
||||||
|
pub fn finish_repo_log() {
|
||||||
|
let active = REPO_LOG.with(|repo_log| repo_log.borrow_mut().take());
|
||||||
|
let Some(active) = active else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if !active.owner {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let repo_log = active.context.inner;
|
||||||
|
let lines = {
|
||||||
|
let mut lines = repo_log
|
||||||
|
.lines
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||||
|
std::mem::take(&mut *lines)
|
||||||
|
};
|
||||||
|
|
||||||
|
with_output(|| {
|
||||||
|
for line in lines {
|
||||||
|
println!("{line}");
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn line(args: fmt::Arguments<'_>) {
|
pub fn line(args: fmt::Arguments<'_>) {
|
||||||
let text = args.to_string();
|
let text = args.to_string();
|
||||||
let captured = REPO_LOG.with(|repo_log| {
|
let context = current_repo_log_context();
|
||||||
let mut repo_log = repo_log.borrow_mut();
|
if let Some(context) = context {
|
||||||
let Some(repo_log) = repo_log.as_mut() else {
|
capture_repo_line(&context, &text);
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
if text.is_empty() {
|
|
||||||
repo_log.lines.push(String::new());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
for line in text.lines() {
|
|
||||||
repo_log.lines.push(line.to_string());
|
|
||||||
if !line.trim().is_empty() {
|
|
||||||
update_status(repo_log, line.trim());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
true
|
|
||||||
});
|
|
||||||
|
|
||||||
if captured {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
with_output(|output| {
|
with_output(|| {
|
||||||
if let Some(status) = output.status.as_mut() {
|
|
||||||
clear_status(status);
|
|
||||||
}
|
|
||||||
println!("{text}");
|
println!("{text}");
|
||||||
if let Some(status) = output.status.as_mut() {
|
|
||||||
draw_status(status);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_status(repo_log: &RepoLog, line: &str) {
|
fn capture_repo_line(context: &RepoLogContext, text: &str) {
|
||||||
let repo = repo_prefix(&repo_log.repo_name, repo_log.width);
|
let mut lines = context
|
||||||
let line = truncate_status(line, 96);
|
.inner
|
||||||
with_output(|output| {
|
.lines
|
||||||
let Some(status) = output.status.as_mut() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
if repo_log.slot >= status.slots.len() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
clear_status(status);
|
|
||||||
status.slots[repo_log.slot] = Some(format!(
|
|
||||||
"{} {} {}",
|
|
||||||
style(format!("worker {}", repo_log.slot + 1)).dim(),
|
|
||||||
style(repo).cyan().bold(),
|
|
||||||
line
|
|
||||||
));
|
|
||||||
draw_status(status);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn truncate_status(value: &str, max_chars: usize) -> String {
|
|
||||||
if value.chars().count() <= max_chars {
|
|
||||||
return value.to_string();
|
|
||||||
}
|
|
||||||
let mut output = value.chars().take(max_chars).collect::<String>();
|
|
||||||
output.pop();
|
|
||||||
output.push('~');
|
|
||||||
output
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finish_status_area() {
|
|
||||||
with_output(|output| {
|
|
||||||
if let Some(status) = output.status.as_mut() {
|
|
||||||
clear_status(status);
|
|
||||||
}
|
|
||||||
output.status = None;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn with_output(action: impl FnOnce(&mut OutputState)) {
|
|
||||||
let output = OUTPUT.get_or_init(|| Mutex::new(OutputState::default()));
|
|
||||||
let mut output = output
|
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||||
action(&mut output);
|
if text.is_empty() {
|
||||||
|
lines.push(String::new());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for line in text.lines() {
|
||||||
|
lines.push(line.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_output(action: impl FnOnce()) {
|
||||||
|
let output = OUTPUT.get_or_init(|| Mutex::new(()));
|
||||||
|
let _output = output
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||||
|
action();
|
||||||
let _ = io::stdout().flush();
|
let _ = io::stdout().flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_status(status: &mut StatusState) {
|
|
||||||
if !status.interactive || !status.visible {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let lines = status.slots.len();
|
|
||||||
print!("\x1b[{lines}A\r");
|
|
||||||
for _ in 0..lines {
|
|
||||||
println!("\x1b[2K");
|
|
||||||
}
|
|
||||||
print!("\x1b[{lines}A\r");
|
|
||||||
status.visible = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn draw_status(status: &mut StatusState) {
|
|
||||||
if !status.interactive {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for slot in &status.slots {
|
|
||||||
match slot {
|
|
||||||
Some(line) => println!("{line}"),
|
|
||||||
None => println!("{}", style("idle").dim()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
status.visible = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! logln {
|
macro_rules! logln {
|
||||||
() => {
|
() => {
|
||||||
|
|||||||
@@ -20,13 +20,16 @@ where
|
|||||||
let worker_count = jobs.min(items.len());
|
let worker_count = jobs.min(items.len());
|
||||||
let queue = Arc::new(Mutex::new(VecDeque::from(items)));
|
let queue = Arc::new(Mutex::new(VecDeque::from(items)));
|
||||||
let (sender, receiver) = mpsc::channel();
|
let (sender, receiver) = mpsc::channel();
|
||||||
|
let repo_log_context = crate::logging::current_repo_log_context();
|
||||||
|
|
||||||
thread::scope(|scope| {
|
thread::scope(|scope| {
|
||||||
for _ in 0..worker_count {
|
for _ in 0..worker_count {
|
||||||
let queue = Arc::clone(&queue);
|
let queue = Arc::clone(&queue);
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
let f = &f;
|
let f = &f;
|
||||||
|
let repo_log_context = repo_log_context.clone();
|
||||||
scope.spawn(move || {
|
scope.spawn(move || {
|
||||||
|
let _repo_log_guard = crate::logging::inherit_repo_log(repo_log_context);
|
||||||
while let Some(item) = pop_item(&queue) {
|
while let Some(item) = pop_item(&queue) {
|
||||||
if sender.send(f(item)).is_err() {
|
if sender.send(f(item)).is_err() {
|
||||||
break;
|
break;
|
||||||
|
|||||||
+3
-16
@@ -237,7 +237,6 @@ fn sync_group(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let repo_log_width = repo_log_width(&repo_names);
|
|
||||||
let repo_jobs = repo_names
|
let repo_jobs = repo_names
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|repo_name| {
|
.map(|repo_name| {
|
||||||
@@ -260,11 +259,10 @@ fn sync_group(
|
|||||||
let base_ref_state = context.ref_state.clone();
|
let base_ref_state = context.ref_state.clone();
|
||||||
let queue = Arc::new(Mutex::new(repo_jobs));
|
let queue = Arc::new(Mutex::new(repo_jobs));
|
||||||
let (sender, receiver) = mpsc::channel();
|
let (sender, receiver) = mpsc::channel();
|
||||||
let use_status_area = worker_count > 1;
|
let use_repo_logs = worker_count > 1;
|
||||||
let jobs = context.options.jobs;
|
let jobs = context.options.jobs;
|
||||||
let _status_guard = use_status_area.then(|| logging::start_status_area(worker_count));
|
|
||||||
let failures = thread::scope(|scope| {
|
let failures = thread::scope(|scope| {
|
||||||
for worker_id in 0..worker_count {
|
for _ in 0..worker_count {
|
||||||
let queue = Arc::clone(&queue);
|
let queue = Arc::clone(&queue);
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
let redactor = context.redactor.clone();
|
let redactor = context.redactor.clone();
|
||||||
@@ -275,9 +273,7 @@ fn sync_group(
|
|||||||
|
|
||||||
scope.spawn(move || {
|
scope.spawn(move || {
|
||||||
while let Some(mut job) = pop_repo_job(&queue) {
|
while let Some(mut job) = pop_repo_job(&queue) {
|
||||||
let _repo_log_guard = use_status_area.then(|| {
|
let _repo_log_guard = use_repo_logs.then(logging::start_repo_log);
|
||||||
logging::start_repo_log(job.repo_name.clone(), worker_id, repo_log_width)
|
|
||||||
});
|
|
||||||
let repo_context = RepoSyncContext {
|
let repo_context = RepoSyncContext {
|
||||||
config,
|
config,
|
||||||
mirror,
|
mirror,
|
||||||
@@ -383,15 +379,6 @@ fn pop_repo_job(queue: &Arc<Mutex<VecDeque<RepoSyncJob>>>) -> Option<RepoSyncJob
|
|||||||
.pop_front()
|
.pop_front()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn repo_log_width(repo_names: &BTreeSet<String>) -> usize {
|
|
||||||
repo_names
|
|
||||||
.iter()
|
|
||||||
.map(|name| name.chars().count())
|
|
||||||
.max()
|
|
||||||
.unwrap_or(4)
|
|
||||||
.clamp(4, 32)
|
|
||||||
}
|
|
||||||
|
|
||||||
struct RepoSyncJob {
|
struct RepoSyncJob {
|
||||||
repo_name: String,
|
repo_name: String,
|
||||||
existing: Vec<EndpointRepo>,
|
existing: Vec<EndpointRepo>,
|
||||||
|
|||||||
+21
-8
@@ -1,13 +1,26 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn repo_prefix_pads_and_truncates_to_fixed_width() {
|
fn repo_log_context_is_inherited_by_parallel_workers() {
|
||||||
assert_eq!(repo_prefix("api", 6), "api ");
|
let _guard = start_repo_log();
|
||||||
assert_eq!(repo_prefix("very-long-repo", 8), "very-lo~");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
crate::logln!("outer line");
|
||||||
fn status_text_truncates_to_fixed_width() {
|
crate::parallel::map(vec!["worker line"], 1, |line| {
|
||||||
assert_eq!(truncate_status("short", 8), "short");
|
crate::logln!("{line}");
|
||||||
assert_eq!(truncate_status("very-long-status", 8), "very-lo~");
|
Ok::<_, anyhow::Error>(())
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let lines = {
|
||||||
|
let context = current_repo_log_context().unwrap();
|
||||||
|
context
|
||||||
|
.inner
|
||||||
|
.lines
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|poisoned| poisoned.into_inner())
|
||||||
|
.clone()
|
||||||
|
};
|
||||||
|
finish_repo_log();
|
||||||
|
|
||||||
|
assert_eq!(lines, vec!["outer line", "worker line"]);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user