diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index 6f74bcc0dc..15e02abeda 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -79,6 +79,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState { Sort, SortBy, SplitList, + Tee, Transpose, Uniq, UniqBy, diff --git a/crates/nu-command/src/example_test.rs b/crates/nu-command/src/example_test.rs index 3655f2d00e..d9822f0bf2 100644 --- a/crates/nu-command/src/example_test.rs +++ b/crates/nu-command/src/example_test.rs @@ -10,8 +10,8 @@ pub fn test_examples(cmd: impl Command + 'static) { mod test_examples { use super::super::{ Ansi, Date, Enumerate, Filter, First, Flatten, From, Get, Into, IntoDatetime, IntoString, - Math, MathRound, ParEach, Path, PathParse, Random, Seq, Sort, SortBy, Split, SplitColumn, - SplitRow, Str, StrJoin, StrLength, StrReplace, Update, Url, Values, Wrap, + Math, MathRound, MathSum, ParEach, Path, PathParse, Random, Seq, Sort, SortBy, Split, + SplitColumn, SplitRow, Str, StrJoin, StrLength, StrReplace, Update, Url, Values, Wrap, }; use crate::{Default, Each, To}; use nu_cmd_lang::example_support::{ @@ -83,6 +83,7 @@ mod test_examples { working_set.add_decl(Box::new(Let)); working_set.add_decl(Box::new(Math)); working_set.add_decl(Box::new(MathRound)); + working_set.add_decl(Box::new(MathSum)); working_set.add_decl(Box::new(Mut)); working_set.add_decl(Box::new(Path)); working_set.add_decl(Box::new(PathParse)); diff --git a/crates/nu-command/src/filters/mod.rs b/crates/nu-command/src/filters/mod.rs index 76969b1dab..95e9ae3767 100644 --- a/crates/nu-command/src/filters/mod.rs +++ b/crates/nu-command/src/filters/mod.rs @@ -39,6 +39,7 @@ mod sort; mod sort_by; mod split_by; mod take; +mod tee; mod transpose; mod uniq; mod uniq_by; @@ -92,6 +93,7 @@ pub use sort::Sort; pub use sort_by::SortBy; pub use split_by::SplitBy; pub use take::*; +pub use tee::Tee; pub use transpose::Transpose; pub use uniq::*; pub use uniq_by::UniqBy; diff --git a/crates/nu-command/src/filters/tee.rs b/crates/nu-command/src/filters/tee.rs new file mode 100644 index 0000000000..47502eae83 --- /dev/null +++ b/crates/nu-command/src/filters/tee.rs @@ -0,0 +1,341 @@ +use std::{sync::mpsc, thread}; + +use nu_engine::{eval_block_with_early_return, CallExt}; +use nu_protocol::{ + ast::Call, + engine::{Closure, Command, EngineState, Stack}, + Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError, + Signature, Spanned, SyntaxShape, Type, Value, +}; + +#[derive(Clone)] +pub struct Tee; + +impl Command for Tee { + fn name(&self) -> &str { + "tee" + } + + fn usage(&self) -> &str { + "Copy a stream to another command in parallel." + } + + fn extra_usage(&self) -> &str { + r#"This is useful for doing something else with a stream while still continuing to +use it in your pipeline."# + } + + fn signature(&self) -> Signature { + Signature::build("tee") + .input_output_type(Type::Any, Type::Any) + .switch( + "stderr", + "For external commands: copy the standard error stream instead.", + Some('e'), + ) + .required( + "closure", + SyntaxShape::Closure(None), + "The other command to send the stream to.", + ) + .category(Category::Filters) + } + + fn examples(&self) -> Vec { + vec![ + Example { + example: "http get http://example.org/ | tee { save example.html }", + description: "Save a webpage to a file while also printing it", + result: None, + }, + Example { + example: "do { nu --commands 'print -e error; print ok' } | \ + tee --stderr { save error.log } | complete", + description: "Save error messages from an external command to a file without \ + redirecting them", + result: None, + }, + Example { + example: "1..100 | tee { each { print } } | math sum | wrap sum", + description: "Print numbers and their sum", + result: None, + }, + ] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + let use_stderr = call.has_flag(engine_state, stack, "stderr")?; + + let Spanned { + item: Closure { block_id, captures }, + span: closure_span, + } = call.req(engine_state, stack, 0)?; + + let closure_engine_state = engine_state.clone(); + let mut closure_stack = stack.captures_to_stack(captures); + + let metadata = input.metadata(); + let metadata_clone = metadata.clone(); + + match input { + // Handle external streams specially, to make sure they pass through + PipelineData::ExternalStream { + stdout, + stderr, + exit_code, + span, + metadata, + trim_end_newline, + } => { + let known_size = if use_stderr { + stderr.as_ref().and_then(|s| s.known_size) + } else { + stdout.as_ref().and_then(|s| s.known_size) + }; + + let with_stream = move |rx: mpsc::Receiver, ShellError>>| { + let iter = rx.into_iter(); + let input_from_channel = PipelineData::ExternalStream { + stdout: Some(RawStream::new( + Box::new(iter), + closure_engine_state.ctrlc.clone(), + span, + known_size, + )), + stderr: None, + exit_code: None, + span, + metadata: metadata_clone, + trim_end_newline, + }; + let result = eval_block_with_early_return( + &closure_engine_state, + &mut closure_stack, + closure_engine_state.get_block(block_id), + input_from_channel, + false, + false, + ); + // Make sure to drain any iterator produced to avoid unexpected behavior + result.and_then(|data| data.drain()) + }; + + if use_stderr { + if let Some(stderr) = stderr { + let raw_stream = RawStream::new( + Box::new(tee(stderr.stream, with_stream).map(flatten_result)), + stderr.ctrlc, + stderr.span, + stderr.known_size, + ); + Ok(PipelineData::ExternalStream { + stdout, + stderr: Some(raw_stream), + exit_code, + span, + metadata, + trim_end_newline, + }) + } else { + // Throw an error if the stream doesn't have stderr. This is probably the + // user's mistake (e.g., forgetting to use `do`) + Err(ShellError::GenericError { + error: "Stream passed to `tee --stderr` does not have stderr".into(), + msg: "this stream does not contain stderr".into(), + span: Some(span), + help: Some( + "if this is an external command, you probably need to wrap \ + it in `do { ... }`" + .into(), + ), + inner: vec![], + }) + } + } else { + let stdout = stdout.map(|stdout| { + RawStream::new( + Box::new(tee(stdout.stream, with_stream).map(flatten_result)), + stdout.ctrlc, + stdout.span, + stdout.known_size, + ) + }); + Ok(PipelineData::ExternalStream { + stdout, + stderr, + exit_code, + span, + metadata, + trim_end_newline, + }) + } + } + // --stderr is not allowed if the input is not an external stream + _ if use_stderr => Err(ShellError::UnsupportedInput { + msg: "--stderr can only be used on external streams".into(), + input: "the input to `tee` is not an external stream".into(), + msg_span: call.head, + input_span: input.span().unwrap_or(call.head), + }), + // Handle others with the plain iterator + _ => { + let teed = tee(input.into_iter(), move |rx| { + let input_from_channel = rx.into_pipeline_data_with_metadata( + metadata_clone, + closure_engine_state.ctrlc.clone(), + ); + let result = eval_block_with_early_return( + &closure_engine_state, + &mut closure_stack, + closure_engine_state.get_block(block_id), + input_from_channel, + false, + false, + ); + // Make sure to drain any iterator produced to avoid unexpected behavior + result.and_then(|data| data.drain()) + }) + .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span))) + .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()); + + Ok(teed) + } + } + } +} + +fn panic_error() -> ShellError { + ShellError::NushellFailed { + msg: "A panic occurred on a thread spawned by `tee`".into(), + } +} + +fn flatten_result(result: Result, E>) -> Result { + result.unwrap_or_else(Err) +} + +/// Copies the iterator to a channel on another thread. If an error is produced on that thread, +/// it is embedded in the resulting iterator as an `Err` as soon as possible. When the iterator +/// finishes, it waits for the other thread to finish, also handling any error produced at that +/// point. +fn tee( + input: impl Iterator, + with_cloned_stream: impl FnOnce(mpsc::Receiver) -> Result<(), ShellError> + Send + 'static, +) -> impl Iterator> +where + T: Clone + Send + 'static, +{ + // For sending the values to the other thread + let (tx, rx) = mpsc::channel(); + + let mut thread = Some( + thread::Builder::new() + .name("stderr consumer".into()) + .spawn(move || with_cloned_stream(rx)) + .expect("could not create thread"), + ); + + let mut iter = input.into_iter(); + let mut tx = Some(tx); + + std::iter::from_fn(move || { + if thread.as_ref().is_some_and(|t| t.is_finished()) { + // Check for an error from the other thread + let result = thread + .take() + .expect("thread was taken early") + .join() + .unwrap_or_else(|_| Err(panic_error())); + if let Err(err) = result { + // Embed the error early + return Some(Err(err)); + } + } + + // Get a value from the iterator + if let Some(value) = iter.next() { + // Send a copy, ignoring any error if the channel is closed + let _ = tx.as_ref().map(|tx| tx.send(value.clone())); + Some(Ok(value)) + } else { + // Close the channel so the stream ends for the other thread + drop(tx.take()); + // Wait for the other thread, and embed any error produced + thread.take().and_then(|t| { + t.join() + .unwrap_or_else(|_| Err(panic_error())) + .err() + .map(Err) + }) + } + }) +} + +#[test] +fn tee_copies_values_to_other_thread_and_passes_them_through() { + let (tx, rx) = mpsc::channel(); + + let expected_values = vec![1, 2, 3, 4]; + + let my_result = tee(expected_values.clone().into_iter(), move |rx| { + for val in rx { + let _ = tx.send(val); + } + Ok(()) + }) + .collect::, ShellError>>() + .expect("should not produce error"); + + assert_eq!(expected_values, my_result); + + let other_threads_result = rx.into_iter().collect::>(); + + assert_eq!(expected_values, other_threads_result); +} + +#[test] +fn tee_forwards_errors_back_immediately() { + use std::time::Duration; + let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1))); + let iter = tee(slow_input, |_| { + Err(ShellError::IOError { msg: "test".into() }) + }); + for result in iter { + if let Ok(val) = result { + // should not make it to the end + assert!(val < 99, "the error did not come early enough"); + } else { + // got the error + return; + } + } + panic!("never received the error"); +} + +#[test] +fn tee_waits_for_the_other_thread() { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + use std::time::Duration; + let waited = Arc::new(AtomicBool::new(false)); + let waited_clone = waited.clone(); + let iter = tee(0..100, move |_| { + std::thread::sleep(Duration::from_millis(10)); + waited_clone.store(true, Ordering::Relaxed); + Err(ShellError::IOError { msg: "test".into() }) + }); + let last = iter.last(); + assert!(waited.load(Ordering::Relaxed), "failed to wait"); + assert!( + last.is_some_and(|res| res.is_err()), + "failed to return error from wait" + ); +} diff --git a/crates/nu-command/tests/commands/mod.rs b/crates/nu-command/tests/commands/mod.rs index ffbfba3a5a..8f651d65bb 100644 --- a/crates/nu-command/tests/commands/mod.rs +++ b/crates/nu-command/tests/commands/mod.rs @@ -104,6 +104,7 @@ mod split_row; mod str_; mod table; mod take; +mod tee; mod terminal; mod to_text; mod touch; diff --git a/crates/nu-command/tests/commands/tee.rs b/crates/nu-command/tests/commands/tee.rs new file mode 100644 index 0000000000..6a69d7fe6d --- /dev/null +++ b/crates/nu-command/tests/commands/tee.rs @@ -0,0 +1,49 @@ +use nu_test_support::{fs::file_contents, nu, playground::Playground}; + +#[test] +fn tee_save_values_to_file() { + Playground::setup("tee_save_values_to_file_test", |dirs, _sandbox| { + let output = nu!( + cwd: dirs.test(), + r#"1..5 | tee { save copy.txt } | to text"# + ); + assert_eq!("12345", output.out); + assert_eq!( + "1\n2\n3\n4\n5\n", + file_contents(dirs.test().join("copy.txt")) + ); + }) +} + +#[test] +fn tee_save_stdout_to_file() { + Playground::setup("tee_save_stdout_to_file_test", |dirs, _sandbox| { + let output = nu!( + cwd: dirs.test(), + r#" + $env.FOO = "teststring" + nu --testbin echo_env FOO | tee { save copy.txt } + "# + ); + assert_eq!("teststring", output.out); + assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt"))); + }) +} + +#[test] +fn tee_save_stderr_to_file() { + Playground::setup("tee_save_stderr_to_file_test", |dirs, _sandbox| { + let output = nu!( + cwd: dirs.test(), + "\ + $env.FOO = \"teststring\"; \ + do { nu --testbin echo_env_stderr FOO } | \ + tee --stderr { save copy.txt } | \ + complete | \ + get stderr + " + ); + assert_eq!("teststring", output.out); + assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt"))); + }) +}