diff --git a/crates/nu-plugin/src/plugin/interface/stream/tests.rs b/crates/nu-plugin/src/plugin/interface/stream/tests.rs index a775fd3baa..b8b301aa08 100644 --- a/crates/nu-plugin/src/plugin/interface/stream/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/stream/tests.rs @@ -3,7 +3,7 @@ use std::{ atomic::{AtomicBool, Ordering::Relaxed}, mpsc, Arc, }, - time::Duration, + time::{Duration, Instant}, }; use nu_protocol::{ShellError, Value}; @@ -16,6 +16,36 @@ use super::{StreamManager, StreamReader, StreamWriter, StreamWriterSignal, Write // slow to complete. 10 ms is a pretty long time const WAIT_DURATION: Duration = Duration::from_millis(10); +// Maximum time to wait for a condition to be true +const MAX_WAIT_DURATION: Duration = Duration::from_millis(500); + +/// Wait for a condition to be true, or panic if the duration exceeds MAX_WAIT_DURATION +#[track_caller] +fn wait_for_condition(mut cond: impl FnMut() -> bool, message: &str) { + // Early check + if cond() { + return; + } + + let start = Instant::now(); + loop { + std::thread::sleep(Duration::from_millis(10)); + + if cond() { + return; + } + + let elapsed = Instant::now().saturating_duration_since(start); + if elapsed > MAX_WAIT_DURATION { + panic!( + "{message}: Waited {:.2}sec, which is more than the maximum of {:.2}sec", + elapsed.as_secs_f64(), + MAX_WAIT_DURATION.as_secs_f64(), + ); + } + } +} + #[derive(Debug, Clone, Default)] struct TestSink(Vec); @@ -301,8 +331,7 @@ fn signal_wait_for_drain_blocks_on_unacknowledged() -> Result<(), ShellError> { for _ in 0..100 { signal.notify_acknowledged()?; } - std::thread::sleep(WAIT_DURATION); - assert!(spawned.is_finished(), "blocked at end"); + wait_for_condition(|| spawned.is_finished(), "blocked at end"); spawned.join().unwrap() }) } @@ -322,8 +351,7 @@ fn signal_wait_for_drain_unblocks_on_dropped() -> Result<(), ShellError> { std::thread::sleep(WAIT_DURATION); assert!(!spawned.is_finished(), "didn't block"); signal.set_dropped()?; - std::thread::sleep(WAIT_DURATION); - assert!(spawned.is_finished(), "still blocked at end"); + wait_for_condition(|| spawned.is_finished(), "still blocked at end"); spawned.join().unwrap() }) }