Fix external output threading and ctrlc (#313)

This commit is contained in:
JT 2021-11-09 19:14:00 +13:00 committed by GitHub
parent 34617fabd9
commit 0a20052799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 27 deletions

View File

@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::env; use std::env;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::process::{Command as CommandSys, Stdio}; use std::process::{Command as CommandSys, Stdio};
use std::sync::atomic::Ordering;
use std::sync::mpsc; use std::sync::mpsc;
use nu_protocol::engine::{EngineState, Stack}; use nu_protocol::engine::{EngineState, Stack};
@ -121,18 +122,26 @@ impl ExternalCommand {
}); });
} }
// If this external is not the last expression, then its output is piped to a channel let last_expression = self.last_expression;
// and we create a ValueStream that can be consumed let span = self.name.span;
let value = if !self.last_expression { let output_ctrlc = ctrlc.clone();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let stdout = child.stdout.take().ok_or_else(|| {
ShellError::ExternalCommand( std::thread::spawn(move || {
"Error taking stdout from external".to_string(), // If this external is not the last expression, then its output is piped to a channel
self.name.span, // and we create a ValueStream that can be consumed
) if !last_expression {
})?; let stdout = child
.stdout
.take()
.ok_or_else(|| {
ShellError::ExternalCommand(
"Error taking stdout from external".to_string(),
span,
)
})
.unwrap();
std::thread::spawn(move || {
// Stdout is read using the Buffer reader. It will do so until there is an // Stdout is read using the Buffer reader. It will do so until there is an
// error or there are no more bytes to read // error or there are no more bytes to read
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout); let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout);
@ -153,26 +162,29 @@ impl ExternalCommand {
let length = bytes.len(); let length = bytes.len();
buf_read.consume(length); buf_read.consume(length);
if let Some(ctrlc) = &ctrlc {
if ctrlc.load(Ordering::SeqCst) {
break;
}
}
match tx.send(data) { match tx.send(data) {
Ok(_) => continue, Ok(_) => continue,
Err(_) => break, Err(_) => break,
} }
} }
}); }
// The ValueStream is consumed by the next expression in the pipeline match child.wait() {
ChannelReceiver::new(rx, self.name.span).into_pipeline_data(ctrlc) Err(err) => Err(ShellError::ExternalCommand(format!("{}", err), span)),
} else { Ok(_) => Ok(()),
PipelineData::new(self.name.span) }
}; });
// The ValueStream is consumed by the next expression in the pipeline
let value =
ChannelReceiver::new(rx, self.name.span).into_pipeline_data(output_ctrlc);
match child.wait() { Ok(value)
Err(err) => Err(ShellError::ExternalCommand(
format!("{}", err),
self.name.span,
)),
Ok(_) => Ok(value),
}
} }
} }
} }
@ -205,6 +217,7 @@ impl ExternalCommand {
// The piped data from stdout from the external command can be either String // The piped data from stdout from the external command can be either String
// or binary. We use this enum to pass the data from the spawned process // or binary. We use this enum to pass the data from the spawned process
#[derive(Debug)]
enum Data { enum Data {
String(String), String(String),
Bytes(Vec<u8>), Bytes(Vec<u8>),

View File

@ -3,6 +3,8 @@ use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{IntoPipelineData, PipelineData, ShellError, Signature, Span, Value}; use nu_protocol::{IntoPipelineData, PipelineData, ShellError, Signature, Span, Value};
use nu_table::StyledString; use nu_table::StyledString;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use terminal_size::{Height, Width}; use terminal_size::{Height, Width};
#[derive(Clone)] #[derive(Clone)]
@ -24,11 +26,13 @@ impl Command for Table {
fn run( fn run(
&self, &self,
_engine_state: &EngineState, engine_state: &EngineState,
_stack: &mut Stack, _stack: &mut Stack,
call: &Call, call: &Call,
input: PipelineData, input: PipelineData,
) -> Result<nu_protocol::PipelineData, nu_protocol::ShellError> { ) -> Result<nu_protocol::PipelineData, nu_protocol::ShellError> {
let ctrlc = engine_state.ctrlc.clone();
let term_width = if let Some((Width(w), Height(_h))) = terminal_size::terminal_size() { let term_width = if let Some((Width(w), Height(_h))) = terminal_size::terminal_size() {
w as usize w as usize
} else { } else {
@ -37,7 +41,7 @@ impl Command for Table {
match input { match input {
PipelineData::Value(Value::List { vals, .. }) => { PipelineData::Value(Value::List { vals, .. }) => {
let table = convert_to_table(vals)?; let table = convert_to_table(vals, ctrlc)?;
if let Some(table) = table { if let Some(table) = table {
let result = nu_table::draw_table(&table, term_width, &HashMap::new()); let result = nu_table::draw_table(&table, term_width, &HashMap::new());
@ -52,7 +56,7 @@ impl Command for Table {
} }
} }
PipelineData::Stream(stream) => { PipelineData::Stream(stream) => {
let table = convert_to_table(stream)?; let table = convert_to_table(stream, ctrlc)?;
if let Some(table) = table { if let Some(table) = table {
let result = nu_table::draw_table(&table, term_width, &HashMap::new()); let result = nu_table::draw_table(&table, term_width, &HashMap::new());
@ -104,6 +108,7 @@ impl Command for Table {
fn convert_to_table( fn convert_to_table(
iter: impl IntoIterator<Item = Value>, iter: impl IntoIterator<Item = Value>,
ctrlc: Option<Arc<AtomicBool>>,
) -> Result<Option<nu_table::Table>, ShellError> { ) -> Result<Option<nu_table::Table>, ShellError> {
let mut iter = iter.into_iter().peekable(); let mut iter = iter.into_iter().peekable();
@ -117,6 +122,11 @@ fn convert_to_table(
let mut data = vec![]; let mut data = vec![];
for (row_num, item) in iter.enumerate() { for (row_num, item) in iter.enumerate() {
if let Some(ctrlc) = &ctrlc {
if ctrlc.load(Ordering::SeqCst) {
return Ok(None);
}
}
if let Value::Error { error } = item { if let Value::Error { error } = item {
return Err(error); return Err(error);
} }