forked from extern/nushell
Replace panics with errors in thread spawning (#12040)
# Description Replace panics with errors in thread spawning. Also adds `IntoSpanned` trait for easily constructing `Spanned`, and an implementation of `From<Spanned<std::io::Error>>` for `ShellError`, which is used to provide context for the error wherever there was a span conveniently available. In general this should make it more convenient to do the right thing with `std::io::Error` and always add a span to it when it's possible to do so. # User-Facing Changes Fewer panics! # Tests + Formatting - 🟢 `toolkit fmt` - 🟢 `toolkit clippy` - 🟢 `toolkit test` - 🟢 `toolkit test stdlib`
This commit is contained in:
@ -3,6 +3,7 @@ use nu_engine::CallExt;
|
||||
use nu_path::expand_path_with;
|
||||
use nu_protocol::ast::{Call, Expr, Expression};
|
||||
use nu_protocol::engine::{Command, EngineState, Stack};
|
||||
use nu_protocol::IntoSpanned;
|
||||
use nu_protocol::{
|
||||
Category, DataSource, Example, PipelineData, PipelineMetadata, RawStream, ShellError,
|
||||
Signature, Span, Spanned, SyntaxShape, Type, Value,
|
||||
@ -123,19 +124,22 @@ impl Command for Save {
|
||||
)?;
|
||||
|
||||
// delegate a thread to redirect stderr to result.
|
||||
let handler = stderr.map(|stderr_stream| match stderr_file {
|
||||
Some(stderr_file) => thread::Builder::new()
|
||||
.name("stderr redirector".to_string())
|
||||
.spawn(move || stream_to_file(stderr_stream, stderr_file, span, progress))
|
||||
.expect("Failed to create thread"),
|
||||
None => thread::Builder::new()
|
||||
.name("stderr redirector".to_string())
|
||||
.spawn(move || {
|
||||
let _ = stderr_stream.into_bytes();
|
||||
Ok(PipelineData::empty())
|
||||
})
|
||||
.expect("Failed to create thread"),
|
||||
});
|
||||
let handler = stderr
|
||||
.map(|stderr_stream| match stderr_file {
|
||||
Some(stderr_file) => thread::Builder::new()
|
||||
.name("stderr redirector".to_string())
|
||||
.spawn(move || {
|
||||
stream_to_file(stderr_stream, stderr_file, span, progress)
|
||||
}),
|
||||
None => thread::Builder::new()
|
||||
.name("stderr redirector".to_string())
|
||||
.spawn(move || {
|
||||
let _ = stderr_stream.into_bytes();
|
||||
Ok(PipelineData::empty())
|
||||
}),
|
||||
})
|
||||
.transpose()
|
||||
.map_err(|e| e.into_spanned(span))?;
|
||||
|
||||
let res = stream_to_file(stream, file, span, progress);
|
||||
if let Some(h) = handler {
|
||||
|
@ -4,8 +4,8 @@ use nu_engine::{eval_block_with_early_return, CallExt};
|
||||
use nu_protocol::{
|
||||
ast::Call,
|
||||
engine::{Closure, Command, EngineState, Stack},
|
||||
Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError,
|
||||
Signature, Spanned, SyntaxShape, Type, Value,
|
||||
Category, Example, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, RawStream,
|
||||
ShellError, Signature, Spanned, SyntaxShape, Type, Value,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -128,8 +128,10 @@ use it in your pipeline."#
|
||||
|
||||
if use_stderr {
|
||||
if let Some(stderr) = stderr {
|
||||
let iter = tee(stderr.stream, with_stream)
|
||||
.map_err(|e| e.into_spanned(call.head))?;
|
||||
let raw_stream = RawStream::new(
|
||||
Box::new(tee(stderr.stream, with_stream).map(flatten_result)),
|
||||
Box::new(iter.map(flatten_result)),
|
||||
stderr.ctrlc,
|
||||
stderr.span,
|
||||
stderr.known_size,
|
||||
@ -158,14 +160,18 @@ use it in your pipeline."#
|
||||
})
|
||||
}
|
||||
} else {
|
||||
let stdout = stdout.map(|stdout| {
|
||||
RawStream::new(
|
||||
Box::new(tee(stdout.stream, with_stream).map(flatten_result)),
|
||||
stdout.ctrlc,
|
||||
stdout.span,
|
||||
stdout.known_size,
|
||||
)
|
||||
});
|
||||
let stdout = stdout
|
||||
.map(|stdout| {
|
||||
let iter = tee(stdout.stream, with_stream)
|
||||
.map_err(|e| e.into_spanned(call.head))?;
|
||||
Ok::<_, ShellError>(RawStream::new(
|
||||
Box::new(iter.map(flatten_result)),
|
||||
stdout.ctrlc,
|
||||
stdout.span,
|
||||
stdout.known_size,
|
||||
))
|
||||
})
|
||||
.transpose()?;
|
||||
Ok(PipelineData::ExternalStream {
|
||||
stdout,
|
||||
stderr,
|
||||
@ -201,6 +207,7 @@ use it in your pipeline."#
|
||||
// Make sure to drain any iterator produced to avoid unexpected behavior
|
||||
result.and_then(|data| data.drain())
|
||||
})
|
||||
.map_err(|e| e.into_spanned(call.head))?
|
||||
.map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
|
||||
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone());
|
||||
|
||||
@ -227,7 +234,7 @@ fn flatten_result<T, E>(result: Result<Result<T, E>, E>) -> Result<T, E> {
|
||||
fn tee<T>(
|
||||
input: impl Iterator<Item = T>,
|
||||
with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
|
||||
) -> impl Iterator<Item = Result<T, ShellError>>
|
||||
) -> Result<impl Iterator<Item = Result<T, ShellError>>, std::io::Error>
|
||||
where
|
||||
T: Clone + Send + 'static,
|
||||
{
|
||||
@ -237,14 +244,13 @@ where
|
||||
let mut thread = Some(
|
||||
thread::Builder::new()
|
||||
.name("stderr consumer".into())
|
||||
.spawn(move || with_cloned_stream(rx))
|
||||
.expect("could not create thread"),
|
||||
.spawn(move || with_cloned_stream(rx))?,
|
||||
);
|
||||
|
||||
let mut iter = input.into_iter();
|
||||
let mut tx = Some(tx);
|
||||
|
||||
std::iter::from_fn(move || {
|
||||
Ok(std::iter::from_fn(move || {
|
||||
if thread.as_ref().is_some_and(|t| t.is_finished()) {
|
||||
// Check for an error from the other thread
|
||||
let result = thread
|
||||
@ -274,7 +280,7 @@ where
|
||||
.map(Err)
|
||||
})
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -289,6 +295,7 @@ fn tee_copies_values_to_other_thread_and_passes_them_through() {
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.expect("io error")
|
||||
.collect::<Result<Vec<i32>, ShellError>>()
|
||||
.expect("should not produce error");
|
||||
|
||||
@ -305,7 +312,8 @@ fn tee_forwards_errors_back_immediately() {
|
||||
let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
|
||||
let iter = tee(slow_input, |_| {
|
||||
Err(ShellError::IOError { msg: "test".into() })
|
||||
});
|
||||
})
|
||||
.expect("io error");
|
||||
for result in iter {
|
||||
if let Ok(val) = result {
|
||||
// should not make it to the end
|
||||
@ -331,7 +339,8 @@ fn tee_waits_for_the_other_thread() {
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
waited_clone.store(true, Ordering::Relaxed);
|
||||
Err(ShellError::IOError { msg: "test".into() })
|
||||
});
|
||||
})
|
||||
.expect("io error");
|
||||
let last = iter.last();
|
||||
assert!(waited.load(Ordering::Relaxed), "failed to wait");
|
||||
assert!(
|
||||
|
@ -283,7 +283,7 @@ fn send_cancellable_request(
|
||||
let ret = request_fn();
|
||||
let _ = tx.send(ret); // may fail if the user has cancelled the operation
|
||||
})
|
||||
.expect("Failed to create thread");
|
||||
.map_err(ShellError::from)?;
|
||||
|
||||
// ...and poll the channel for responses
|
||||
loop {
|
||||
|
@ -1,7 +1,8 @@
|
||||
use nu_protocol::{
|
||||
ast::Call,
|
||||
engine::{Command, EngineState, Stack},
|
||||
Category, Example, IntoPipelineData, PipelineData, Record, ShellError, Signature, Type, Value,
|
||||
Category, Example, IntoPipelineData, IntoSpanned, PipelineData, Record, ShellError, Signature,
|
||||
Type, Value,
|
||||
};
|
||||
|
||||
use std::thread;
|
||||
@ -52,9 +53,9 @@ impl Command for Complete {
|
||||
// consumes the first 65535 bytes
|
||||
// So we need a thread to receive stderr message, then the current thread can continue to consume
|
||||
// stdout messages.
|
||||
let stderr_handler = stderr.map(|stderr| {
|
||||
let stderr_span = stderr.span;
|
||||
(
|
||||
let stderr_handler = stderr
|
||||
.map(|stderr| {
|
||||
let stderr_span = stderr.span;
|
||||
thread::Builder::new()
|
||||
.name("stderr consumer".to_string())
|
||||
.spawn(move || {
|
||||
@ -65,10 +66,10 @@ impl Command for Complete {
|
||||
Ok::<_, ShellError>(Value::binary(stderr.item, stderr.span))
|
||||
}
|
||||
})
|
||||
.expect("failed to create thread"),
|
||||
stderr_span,
|
||||
)
|
||||
});
|
||||
.map(|handle| (handle, stderr_span))
|
||||
.map_err(|err| err.into_spanned(call.head))
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
if let Some(stdout) = stdout {
|
||||
let stdout = stdout.into_bytes()?;
|
||||
|
@ -2,6 +2,7 @@ use nu_cmd_base::hook::eval_hook;
|
||||
use nu_engine::env_to_strings;
|
||||
use nu_engine::eval_expression;
|
||||
use nu_engine::CallExt;
|
||||
use nu_protocol::IntoSpanned;
|
||||
use nu_protocol::NuGlob;
|
||||
use nu_protocol::{
|
||||
ast::{Call, Expr},
|
||||
@ -438,7 +439,7 @@ impl ExternalCommand {
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.expect("Failed to create thread");
|
||||
.map_err(|e| e.into_spanned(head))?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -526,7 +527,7 @@ impl ExternalCommand {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}).expect("Failed to create thread");
|
||||
}).map_err(|e| e.into_spanned(head))?;
|
||||
|
||||
let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
||||
if redirect_stderr {
|
||||
@ -543,7 +544,7 @@ impl ExternalCommand {
|
||||
read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc);
|
||||
Ok::<(), ShellError>(())
|
||||
})
|
||||
.expect("Failed to create thread");
|
||||
.map_err(|e| e.into_spanned(head))?;
|
||||
}
|
||||
|
||||
let stdout_receiver = ChannelReceiver::new(stdout_rx);
|
||||
|
Reference in New Issue
Block a user