add threading

This commit is contained in:
JT 2021-10-26 11:56:29 +13:00
parent c18f0dcc84
commit 962adf5a12

View File

@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::process::{ChildStdin, Command as CommandSys, Stdio}; use std::process::{Command as CommandSys, Stdio};
use std::sync::mpsc; use std::sync::mpsc;
use nu_protocol::engine::{EngineState, Stack}; use nu_protocol::engine::{EngineState, Stack};
@ -79,11 +79,7 @@ impl ExternalCommand {
// If there is an input from the pipeline. The stdin from the process // If there is an input from the pipeline. The stdin from the process
// is piped so it can be used to send the input information // is piped so it can be used to send the input information
if let PipelineData::Value(Value::String { .. }) = input { if !matches!(input, PipelineData::Value(Value::Nothing { .. })) {
process.stdin(Stdio::piped());
}
if let PipelineData::Stream { .. } = input {
process.stdin(Stdio::piped()); process.stdin(Stdio::piped());
} }
@ -95,17 +91,30 @@ impl ExternalCommand {
Ok(mut child) => { Ok(mut child) => {
// if there is a string or a stream, that is sent to the pipe std // if there is a string or a stream, that is sent to the pipe std
if let Some(mut stdin_write) = child.stdin.take() { if let Some(mut stdin_write) = child.stdin.take() {
for value in input { std::thread::spawn(move || {
match value { for value in input.into_iter() {
Value::String { val, span: _ } => { match value {
self.write_to_stdin(&mut stdin_write, val.as_bytes())? Value::String { val, span: _ } => {
if stdin_write.write(val.as_bytes()).is_err() {
return Ok(());
}
}
Value::Binary { val, span: _ } => {
if stdin_write.write(&val).is_err() {
return Ok(());
}
}
x => {
if stdin_write.write(x.into_string().as_bytes()).is_err() {
return Err(());
}
}
} }
Value::Binary { val, span: _ } => {
self.write_to_stdin(&mut stdin_write, &val)?
}
_ => continue,
} }
} Ok(())
});
} else {
println!("Couldn't take stdin");
} }
// If this external is not the last expression, then its output is piped to a channel // If this external is not the last expression, then its output is piped to a channel
@ -188,17 +197,6 @@ impl ExternalCommand {
process process
} }
} }
fn write_to_stdin(&self, stdin_write: &mut ChildStdin, val: &[u8]) -> Result<(), ShellError> {
if stdin_write.write(val).is_err() {
Err(ShellError::ExternalCommand(
"Error writing input to stdin".to_string(),
self.name.span,
))
} else {
Ok(())
}
}
} }
// The piped data from stdout from the external command can be either String // The piped data from stdout from the external command can be either String