diff --git a/crates/nu-cmd-lang/src/core_commands/do_.rs b/crates/nu-cmd-lang/src/core_commands/do_.rs index 4931361a73..067120f6cb 100644 --- a/crates/nu-cmd-lang/src/core_commands/do_.rs +++ b/crates/nu-cmd-lang/src/core_commands/do_.rs @@ -4,8 +4,8 @@ use nu_engine::{eval_block_with_early_return, redirect_env, CallExt}; use nu_protocol::ast::Call; use nu_protocol::engine::{Closure, Command, EngineState, Stack}; use nu_protocol::{ - Category, Example, ListStream, PipelineData, RawStream, ShellError, Signature, SyntaxShape, - Type, Value, + Category, Example, IntoSpanned, ListStream, PipelineData, RawStream, ShellError, Signature, + SyntaxShape, Type, Value, }; #[derive(Clone)] @@ -147,23 +147,25 @@ impl Command for Do { // consumes the first 65535 bytes // So we need a thread to receive stdout message, then the current thread can continue to consume // stderr messages. - let stdout_handler = stdout.map(|stdout_stream| { - thread::Builder::new() - .name("stderr redirector".to_string()) - .spawn(move || { - let ctrlc = stdout_stream.ctrlc.clone(); - let span = stdout_stream.span; - RawStream::new( - Box::new(std::iter::once( - stdout_stream.into_bytes().map(|s| s.item), - )), - ctrlc, - span, - None, - ) - }) - .expect("Failed to create thread") - }); + let stdout_handler = stdout + .map(|stdout_stream| { + thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || { + let ctrlc = stdout_stream.ctrlc.clone(); + let span = stdout_stream.span; + RawStream::new( + Box::new(std::iter::once( + stdout_stream.into_bytes().map(|s| s.item), + )), + ctrlc, + span, + None, + ) + }) + .map_err(|e| e.into_spanned(call.head)) + }) + .transpose()?; // Intercept stderr so we can return it in the error if the exit code is non-zero. // The threading issues mentioned above dictate why we also need to intercept stdout. diff --git a/crates/nu-command/src/filesystem/save.rs b/crates/nu-command/src/filesystem/save.rs index 129faae1c7..23bfd0dcc9 100644 --- a/crates/nu-command/src/filesystem/save.rs +++ b/crates/nu-command/src/filesystem/save.rs @@ -3,6 +3,7 @@ use nu_engine::CallExt; use nu_path::expand_path_with; use nu_protocol::ast::{Call, Expr, Expression}; use nu_protocol::engine::{Command, EngineState, Stack}; +use nu_protocol::IntoSpanned; use nu_protocol::{ Category, DataSource, Example, PipelineData, PipelineMetadata, RawStream, ShellError, Signature, Span, Spanned, SyntaxShape, Type, Value, @@ -123,19 +124,22 @@ impl Command for Save { )?; // delegate a thread to redirect stderr to result. - let handler = stderr.map(|stderr_stream| match stderr_file { - Some(stderr_file) => thread::Builder::new() - .name("stderr redirector".to_string()) - .spawn(move || stream_to_file(stderr_stream, stderr_file, span, progress)) - .expect("Failed to create thread"), - None => thread::Builder::new() - .name("stderr redirector".to_string()) - .spawn(move || { - let _ = stderr_stream.into_bytes(); - Ok(PipelineData::empty()) - }) - .expect("Failed to create thread"), - }); + let handler = stderr + .map(|stderr_stream| match stderr_file { + Some(stderr_file) => thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || { + stream_to_file(stderr_stream, stderr_file, span, progress) + }), + None => thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || { + let _ = stderr_stream.into_bytes(); + Ok(PipelineData::empty()) + }), + }) + .transpose() + .map_err(|e| e.into_spanned(span))?; let res = stream_to_file(stream, file, span, progress); if let Some(h) = handler { diff --git a/crates/nu-command/src/filters/tee.rs b/crates/nu-command/src/filters/tee.rs index 47502eae83..95aaa19911 100644 --- a/crates/nu-command/src/filters/tee.rs +++ b/crates/nu-command/src/filters/tee.rs @@ -4,8 +4,8 @@ use nu_engine::{eval_block_with_early_return, CallExt}; use nu_protocol::{ ast::Call, engine::{Closure, Command, EngineState, Stack}, - Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError, - Signature, Spanned, SyntaxShape, Type, Value, + Category, Example, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, RawStream, + ShellError, Signature, Spanned, SyntaxShape, Type, Value, }; #[derive(Clone)] @@ -128,8 +128,10 @@ use it in your pipeline."# if use_stderr { if let Some(stderr) = stderr { + let iter = tee(stderr.stream, with_stream) + .map_err(|e| e.into_spanned(call.head))?; let raw_stream = RawStream::new( - Box::new(tee(stderr.stream, with_stream).map(flatten_result)), + Box::new(iter.map(flatten_result)), stderr.ctrlc, stderr.span, stderr.known_size, @@ -158,14 +160,18 @@ use it in your pipeline."# }) } } else { - let stdout = stdout.map(|stdout| { - RawStream::new( - Box::new(tee(stdout.stream, with_stream).map(flatten_result)), - stdout.ctrlc, - stdout.span, - stdout.known_size, - ) - }); + let stdout = stdout + .map(|stdout| { + let iter = tee(stdout.stream, with_stream) + .map_err(|e| e.into_spanned(call.head))?; + Ok::<_, ShellError>(RawStream::new( + Box::new(iter.map(flatten_result)), + stdout.ctrlc, + stdout.span, + stdout.known_size, + )) + }) + .transpose()?; Ok(PipelineData::ExternalStream { stdout, stderr, @@ -201,6 +207,7 @@ use it in your pipeline."# // Make sure to drain any iterator produced to avoid unexpected behavior result.and_then(|data| data.drain()) }) + .map_err(|e| e.into_spanned(call.head))? .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span))) .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()); @@ -227,7 +234,7 @@ fn flatten_result(result: Result, E>) -> Result { fn tee( input: impl Iterator, with_cloned_stream: impl FnOnce(mpsc::Receiver) -> Result<(), ShellError> + Send + 'static, -) -> impl Iterator> +) -> Result>, std::io::Error> where T: Clone + Send + 'static, { @@ -237,14 +244,13 @@ where let mut thread = Some( thread::Builder::new() .name("stderr consumer".into()) - .spawn(move || with_cloned_stream(rx)) - .expect("could not create thread"), + .spawn(move || with_cloned_stream(rx))?, ); let mut iter = input.into_iter(); let mut tx = Some(tx); - std::iter::from_fn(move || { + Ok(std::iter::from_fn(move || { if thread.as_ref().is_some_and(|t| t.is_finished()) { // Check for an error from the other thread let result = thread @@ -274,7 +280,7 @@ where .map(Err) }) } - }) + })) } #[test] @@ -289,6 +295,7 @@ fn tee_copies_values_to_other_thread_and_passes_them_through() { } Ok(()) }) + .expect("io error") .collect::, ShellError>>() .expect("should not produce error"); @@ -305,7 +312,8 @@ fn tee_forwards_errors_back_immediately() { let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1))); let iter = tee(slow_input, |_| { Err(ShellError::IOError { msg: "test".into() }) - }); + }) + .expect("io error"); for result in iter { if let Ok(val) = result { // should not make it to the end @@ -331,7 +339,8 @@ fn tee_waits_for_the_other_thread() { std::thread::sleep(Duration::from_millis(10)); waited_clone.store(true, Ordering::Relaxed); Err(ShellError::IOError { msg: "test".into() }) - }); + }) + .expect("io error"); let last = iter.last(); assert!(waited.load(Ordering::Relaxed), "failed to wait"); assert!( diff --git a/crates/nu-command/src/network/http/client.rs b/crates/nu-command/src/network/http/client.rs index 1b89bc93d2..d6eebf8c87 100644 --- a/crates/nu-command/src/network/http/client.rs +++ b/crates/nu-command/src/network/http/client.rs @@ -283,7 +283,7 @@ fn send_cancellable_request( let ret = request_fn(); let _ = tx.send(ret); // may fail if the user has cancelled the operation }) - .expect("Failed to create thread"); + .map_err(ShellError::from)?; // ...and poll the channel for responses loop { diff --git a/crates/nu-command/src/system/complete.rs b/crates/nu-command/src/system/complete.rs index 39c4d71a09..86dad78859 100644 --- a/crates/nu-command/src/system/complete.rs +++ b/crates/nu-command/src/system/complete.rs @@ -1,7 +1,8 @@ use nu_protocol::{ ast::Call, engine::{Command, EngineState, Stack}, - Category, Example, IntoPipelineData, PipelineData, Record, ShellError, Signature, Type, Value, + Category, Example, IntoPipelineData, IntoSpanned, PipelineData, Record, ShellError, Signature, + Type, Value, }; use std::thread; @@ -52,9 +53,9 @@ impl Command for Complete { // consumes the first 65535 bytes // So we need a thread to receive stderr message, then the current thread can continue to consume // stdout messages. - let stderr_handler = stderr.map(|stderr| { - let stderr_span = stderr.span; - ( + let stderr_handler = stderr + .map(|stderr| { + let stderr_span = stderr.span; thread::Builder::new() .name("stderr consumer".to_string()) .spawn(move || { @@ -65,10 +66,10 @@ impl Command for Complete { Ok::<_, ShellError>(Value::binary(stderr.item, stderr.span)) } }) - .expect("failed to create thread"), - stderr_span, - ) - }); + .map(|handle| (handle, stderr_span)) + .map_err(|err| err.into_spanned(call.head)) + }) + .transpose()?; if let Some(stdout) = stdout { let stdout = stdout.into_bytes()?; diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 94ea488322..fc2ef3c424 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -2,6 +2,7 @@ use nu_cmd_base::hook::eval_hook; use nu_engine::env_to_strings; use nu_engine::eval_expression; use nu_engine::CallExt; +use nu_protocol::IntoSpanned; use nu_protocol::NuGlob; use nu_protocol::{ ast::{Call, Expr}, @@ -438,7 +439,7 @@ impl ExternalCommand { Ok(()) }) - .expect("Failed to create thread"); + .map_err(|e| e.into_spanned(head))?; } } @@ -526,7 +527,7 @@ impl ExternalCommand { Ok(()) } } - }).expect("Failed to create thread"); + }).map_err(|e| e.into_spanned(head))?; let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT); if redirect_stderr { @@ -543,7 +544,7 @@ impl ExternalCommand { read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc); Ok::<(), ShellError>(()) }) - .expect("Failed to create thread"); + .map_err(|e| e.into_spanned(head))?; } let stdout_receiver = ChannelReceiver::new(stdout_rx); diff --git a/crates/nu-engine/src/eval.rs b/crates/nu-engine/src/eval.rs index ae18de9e6f..c4007042e4 100644 --- a/crates/nu-engine/src/eval.rs +++ b/crates/nu-engine/src/eval.rs @@ -7,8 +7,8 @@ use nu_protocol::{ }, engine::{Closure, EngineState, Stack}, eval_base::Eval, - Config, DeclId, IntoPipelineData, PipelineData, RawStream, ShellError, Span, Spanned, Type, - Value, VarId, ENV_VARIABLE_ID, + Config, DeclId, IntoPipelineData, IntoSpanned, PipelineData, RawStream, ShellError, Span, + Spanned, Type, Value, VarId, ENV_VARIABLE_ID, }; use std::thread::{self, JoinHandle}; use std::{borrow::Cow, collections::HashMap}; @@ -542,7 +542,7 @@ fn eval_element_with_input( stderr_stack, save_call, input, - )); + )?); let (result_out_stream, result_err_stream) = if result_is_out { (result_out_stream, None) } else { @@ -1090,8 +1090,9 @@ impl DataSaveJob { mut stack: Stack, save_call: Call, input: PipelineData, - ) -> Self { - Self { + ) -> Result { + let span = save_call.head; + Ok(Self { inner: thread::Builder::new() .name("stderr saver".to_string()) .spawn(move || { @@ -1100,8 +1101,8 @@ impl DataSaveJob { eprintln!("WARNING: error occurred when redirect to stderr: {:?}", err); } }) - .expect("Failed to create thread"), - } + .map_err(|e| e.into_spanned(span))?, + }) } pub fn join(self) -> thread::Result<()> { diff --git a/crates/nu-plugin/src/plugin/interface.rs b/crates/nu-plugin/src/plugin/interface.rs index 4f130af958..3dcca89602 100644 --- a/crates/nu-plugin/src/plugin/interface.rs +++ b/crates/nu-plugin/src/plugin/interface.rs @@ -369,18 +369,22 @@ where exit_code, } => { thread::scope(|scope| { - let stderr_thread = stderr.map(|(mut writer, stream)| { - thread::Builder::new() - .name("plugin stderr writer".into()) - .spawn_scoped(scope, move || writer.write_all(raw_stream_iter(stream))) - .expect("failed to spawn thread") - }); - let exit_code_thread = exit_code.map(|(mut writer, stream)| { - thread::Builder::new() - .name("plugin exit_code writer".into()) - .spawn_scoped(scope, move || writer.write_all(stream)) - .expect("failed to spawn thread") - }); + let stderr_thread = stderr + .map(|(mut writer, stream)| { + thread::Builder::new() + .name("plugin stderr writer".into()) + .spawn_scoped(scope, move || { + writer.write_all(raw_stream_iter(stream)) + }) + }) + .transpose()?; + let exit_code_thread = exit_code + .map(|(mut writer, stream)| { + thread::Builder::new() + .name("plugin exit_code writer".into()) + .spawn_scoped(scope, move || writer.write_all(stream)) + }) + .transpose()?; // Optimize for stdout: if only stdout is present, don't spawn any other // threads. if let Some((mut writer, stream)) = stdout { @@ -407,10 +411,12 @@ where /// Write all of the data in each of the streams. This method returns immediately; any necessary /// write will happen in the background. If a thread was spawned, its handle is returned. - pub(crate) fn write_background(self) -> Option>> { + pub(crate) fn write_background( + self, + ) -> Result>>, ShellError> { match self { - PipelineDataWriter::None => None, - _ => Some( + PipelineDataWriter::None => Ok(None), + _ => Ok(Some( thread::Builder::new() .name("plugin stream background writer".into()) .spawn(move || { @@ -421,9 +427,8 @@ where log::warn!("Error while writing pipeline in background: {err}"); } result - }) - .expect("failed to spawn thread"), - ), + })?, + )), } } } diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index 2a2e83c740..6a4f854f27 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -419,7 +419,7 @@ impl PluginInterface { let (writer, rx) = self.write_plugin_call(call, context.clone())?; // Finish writing stream in the background - writer.write_background(); + writer.write_background()?; self.receive_plugin_call_response(rx) } diff --git a/crates/nu-plugin/src/plugin/mod.rs b/crates/nu-plugin/src/plugin/mod.rs index e7d4e69498..cc3ccb49d2 100644 --- a/crates/nu-plugin/src/plugin/mod.rs +++ b/crates/nu-plugin/src/plugin/mod.rs @@ -126,7 +126,7 @@ fn make_plugin_interface( .stdin .take() .ok_or_else(|| ShellError::PluginFailedToLoad { - msg: "plugin missing stdin writer".into(), + msg: "Plugin missing stdin writer".into(), })?; let mut stdout = child @@ -158,7 +158,9 @@ fn make_plugin_interface( drop(manager); let _ = child.wait(); }) - .expect("failed to spawn thread"); + .map_err(|err| ShellError::PluginFailedToLoad { + msg: format!("Failed to spawn thread for plugin: {err}"), + })?; Ok(interface) } @@ -422,6 +424,20 @@ pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncod // We need to hold on to the interface to keep the manager alive. We can drop it at the end let interface = manager.get_interface(); + // Determine the plugin name, for errors + let exe = std::env::current_exe().ok(); + + let plugin_name: String = exe + .as_ref() + .and_then(|path| path.file_stem()) + .map(|stem| stem.to_string_lossy().into_owned()) + .map(|stem| { + stem.strip_prefix("nu_plugin_") + .map(|s| s.to_owned()) + .unwrap_or(stem) + }) + .unwrap_or_else(|| "(unknown)".into()); + // Try an operation that could result in ShellError. Exit if an I/O error is encountered. // Try to report the error to nushell otherwise, and failing that, panic. macro_rules! try_or_report { @@ -435,7 +451,7 @@ pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncod Err(err) => { let _ = $interface.write_response(Err(err.clone())).unwrap_or_else(|_| { // If we can't send it to nushell, panic with it so at least we get the output - panic!("{}", err) + panic!("Plugin `{plugin_name}`: {}", err) }); std::process::exit(1) } @@ -445,6 +461,8 @@ pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncod // Send Hello message try_or_report!(interface, interface.hello()); + let plugin_name_clone = plugin_name.clone(); + // Spawn the reader thread std::thread::Builder::new() .name("engine interface reader".into()) @@ -453,24 +471,16 @@ pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncod // Do our best to report the read error. Most likely there is some kind of // incompatibility between the plugin and nushell, so it makes more sense to try to // report it on stderr than to send something. - let exe = std::env::current_exe().ok(); - let plugin_name: String = exe - .as_ref() - .and_then(|path| path.file_stem()) - .map(|stem| stem.to_string_lossy().into_owned()) - .map(|stem| { - stem.strip_prefix("nu_plugin_") - .map(|s| s.to_owned()) - .unwrap_or(stem) - }) - .unwrap_or_else(|| "(unknown)".into()); - - eprintln!("Plugin `{plugin_name}` read error: {err}"); + eprintln!("Plugin `{plugin_name_clone}` read error: {err}"); std::process::exit(1); } }) - .expect("failed to spawn thread"); + .unwrap_or_else(|err| { + // If we fail to spawn the reader thread, we should exit + eprintln!("Plugin `{plugin_name}` failed to launch: {err}"); + std::process::exit(1); + }); for plugin_call in call_receiver { match plugin_call { @@ -492,7 +502,7 @@ pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncod let result = plugin.run(&name, &config, &call, input); let write_result = engine .write_response(result) - .map(|writer| writer.write_background()); + .and_then(|writer| writer.write_background()); try_or_report!(engine, write_result); } // Do an operation on a custom value @@ -514,7 +524,7 @@ pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncod .map(|value| PipelineData::Value(value, None)); let write_result = engine .write_response(result) - .map(|writer| writer.write_background()); + .and_then(|writer| writer.write_background()); try_or_report!(engine, write_result); } } diff --git a/crates/nu-protocol/src/pipeline_data.rs b/crates/nu-protocol/src/pipeline_data.rs index b61973e016..c740badddd 100644 --- a/crates/nu-protocol/src/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline_data.rs @@ -868,8 +868,7 @@ pub fn print_if_stream( let _ = stderr.write_all(&bytes); } } - }) - .expect("could not create thread"); + })?; } if let Some(stream) = stream { diff --git a/crates/nu-protocol/src/shell_error.rs b/crates/nu-protocol/src/shell_error.rs index 070322ea17..c94613ecd2 100644 --- a/crates/nu-protocol/src/shell_error.rs +++ b/crates/nu-protocol/src/shell_error.rs @@ -2,7 +2,9 @@ use miette::Diagnostic; use serde::{Deserialize, Serialize}; use thiserror::Error; -use crate::{ast::Operator, engine::StateWorkingSet, format_error, ParseError, Span, Value}; +use crate::{ + ast::Operator, engine::StateWorkingSet, format_error, ParseError, Span, Spanned, Value, +}; /// The fundamental error type for the evaluation engine. These cases represent different kinds of errors /// the evaluator might face, along with helpful spans to label. An error renderer will take this error value @@ -1361,6 +1363,15 @@ impl From for ShellError { } } +impl From> for ShellError { + fn from(error: Spanned) -> Self { + ShellError::IOErrorSpanned { + msg: error.item.to_string(), + span: error.span, + } + } +} + impl std::convert::From> for ShellError { fn from(input: Box) -> ShellError { ShellError::IOError { diff --git a/crates/nu-protocol/src/span.rs b/crates/nu-protocol/src/span.rs index 2d49f88ab0..1184864b8b 100644 --- a/crates/nu-protocol/src/span.rs +++ b/crates/nu-protocol/src/span.rs @@ -3,14 +3,33 @@ use serde::{Deserialize, Serialize}; /// A spanned area of interest, generic over what kind of thing is of interest #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct Spanned -where - T: Clone + std::fmt::Debug, -{ +pub struct Spanned { pub item: T, pub span: Span, } +/// Helper trait to create [`Spanned`] more ergonomically. +pub trait IntoSpanned: Sized { + /// Wrap items together with a span into [`Spanned`]. + /// + /// # Example + /// + /// ``` + /// # use nu_protocol::{Span, IntoSpanned}; + /// # let span = Span::test_data(); + /// let spanned = "Hello, world!".into_spanned(span); + /// assert_eq!("Hello, world!", spanned.item); + /// assert_eq!(span, spanned.span); + /// ``` + fn into_spanned(self, span: Span) -> Spanned; +} + +impl IntoSpanned for T { + fn into_spanned(self, span: Span) -> Spanned { + Spanned { item: self, span } + } +} + /// Spans are a global offset across all seen files, which are cached in the engine's state. The start and /// end offset together make the inclusive start/exclusive end pair for where to underline to highlight /// a given point of interest.