Move external closer to internal (#1611)

* Refactor InputStream and affected commands.

First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.

Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.

* Add run_external internal command.

We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:

- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution

It should also make it easier for us to start introducing argument signatures
for external commands,

* Update run_external.rs

* Update run_external.rs

* Update run_external.rs

* Update run_external.rs

Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
This commit is contained in:
Jason Gedge
2020-04-19 23:30:44 -04:00
committed by GitHub
parent 6b8c6dec0e
commit 522a828687
67 changed files with 441 additions and 262 deletions

View File

@@ -45,5 +45,5 @@ fn append(
after.push_back(row);
let after = futures::stream::iter(after);
Ok(OutputStream::from_input(input.values.chain(after)))
Ok(OutputStream::from_input(input.chain(after)))
}

View File

@@ -29,7 +29,7 @@ impl WholeStreamCommand for Autoview {
) -> Result<OutputStream, ShellError> {
autoview(RunnableContext {
input: args.input,
commands: registry.clone(),
registry: registry.clone(),
shell_manager: args.shell_manager,
host: args.host,
ctrl_c: args.ctrl_c,
@@ -42,7 +42,7 @@ pub struct RunnableContextWithoutInput {
pub shell_manager: ShellManager,
pub host: Arc<parking_lot::Mutex<Box<dyn Host>>>,
pub ctrl_c: Arc<AtomicBool>,
pub commands: CommandRegistry,
pub registry: CommandRegistry,
pub name: Tag,
}
@@ -52,7 +52,7 @@ impl RunnableContextWithoutInput {
shell_manager: context.shell_manager,
host: context.host,
ctrl_c: context.ctrl_c,
commands: context.commands,
registry: context.registry,
name: context.name,
};
(context.input, new_context)
@@ -92,7 +92,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
if let Some(table) = table {
let command_args = create_default_command_args(&context).with_input(stream);
let result = table.run(command_args, &context.commands);
let result = table.run(command_args, &context.registry);
result.collect::<Vec<_>>().await;
}
}
@@ -106,7 +106,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let command_args = create_default_command_args(&context).with_input(stream);
let result = text.run(command_args, &context.commands);
let result = text.run(command_args, &context.registry);
result.collect::<Vec<_>>().await;
} else {
out!("{}", s);
@@ -126,7 +126,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let command_args = create_default_command_args(&context).with_input(stream);
let result = text.run(command_args, &context.commands);
let result = text.run(command_args, &context.registry);
result.collect::<Vec<_>>().await;
} else {
out!("{}\n", s);
@@ -168,7 +168,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
let mut stream = VecDeque::new();
stream.push_back(x);
let command_args = create_default_command_args(&context).with_input(stream);
let result = binary.run(command_args, &context.commands);
let result = binary.run(command_args, &context.registry);
result.collect::<Vec<_>>().await;
} else {
use pretty_hex::*;
@@ -254,7 +254,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
let mut stream = VecDeque::new();
stream.push_back(x);
let command_args = create_default_command_args(&context).with_input(stream);
let result = table.run(command_args, &context.commands);
let result = table.run(command_args, &context.registry);
result.collect::<Vec<_>>().await;
} else {
out!("{:?}", item);
@@ -291,6 +291,7 @@ fn create_default_command_args(context: &RunnableContextWithoutInput) -> RawComm
positional: None,
named: None,
span,
is_last: true,
},
name_tag: context.name.clone(),
scope: Scope::empty(),

View File

@@ -1,39 +1,29 @@
use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use log::{log_enabled, trace};
use nu_errors::ShellError;
use nu_protocol::hir::SpannedExpression;
use futures_util::pin_mut;
use nu_protocol::Scope;
pub(crate) fn run_expression_block(
expr: SpannedExpression,
context: &mut Context,
input: Option<InputStream>,
input: InputStream,
scope: &Scope,
) -> Result<Option<InputStream>, ShellError> {
let scope = scope.clone();
) -> Result<InputStream, ShellError> {
if log_enabled!(log::Level::Trace) {
trace!(target: "nu::run::expr", "->");
trace!(target: "nu::run::expr", "{:?}", expr);
}
let scope = scope.clone();
let registry = context.registry().clone();
let stream = input.map(move |row| {
let scope = scope.clone().set_it(row);
evaluate_baseline_expr(&expr, &registry, &scope)
});
let stream = async_stream! {
if let Some(input) = input {
let values = input.values;
pin_mut!(values);
while let Some(row) = values.next().await {
let scope = scope.clone().set_it(row);
yield evaluate_baseline_expr(&expr, &registry, &scope);
}
} else {
yield evaluate_baseline_expr(&expr, &registry, &scope);
}
};
Ok(Some(stream.to_input_stream()))
Ok(stream.to_input_stream())
}

View File

@@ -1,19 +1,22 @@
use crate::futures::ThreadedReceiver;
use crate::prelude::*;
use std::io::Write;
use std::ops::Deref;
use std::process::{Command, Stdio};
use std::sync::mpsc;
use bytes::{BufMut, Bytes, BytesMut};
use futures::executor::block_on_stream;
use futures::stream::StreamExt;
use futures_codec::FramedRead;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::hir::{ExternalArg, ExternalCommand};
use nu_protocol::{ColumnPath, Primitive, Scope, ShellTypeName, UntaggedValue, Value};
use nu_source::{Tag, Tagged};
use nu_value_ext::as_column_path;
use std::io::Write;
use std::ops::Deref;
use std::process::{Command, Stdio};
use std::sync::mpsc;
pub enum StringOrBinary {
String(String),
@@ -96,10 +99,10 @@ pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<Str
pub(crate) async fn run_external_command(
command: ExternalCommand,
context: &mut Context,
input: Option<InputStream>,
input: InputStream,
_scope: &Scope,
is_last: bool,
) -> Result<Option<InputStream>, ShellError> {
) -> Result<InputStream, ShellError> {
trace!(target: "nu::run::external", "-> {}", command.name);
if !did_find_command(&command.name).await {
@@ -110,7 +113,7 @@ pub(crate) async fn run_external_command(
));
}
if command.has_it_argument() || command.has_nu_argument() {
if command.has_it_argument() {
run_with_iterator_arg(command, context, input, is_last)
} else {
run_with_stdin(command, context, input, is_last)
@@ -166,16 +169,13 @@ fn to_column_path(
fn run_with_iterator_arg(
command: ExternalCommand,
context: &mut Context,
input: Option<InputStream>,
input: InputStream,
is_last: bool,
) -> Result<Option<InputStream>, ShellError> {
) -> Result<InputStream, ShellError> {
let path = context.shell_manager.path();
let mut inputs: InputStream = if let Some(input) = input {
trace_stream!(target: "nu::trace_stream::external::it", "input" = input)
} else {
InputStream::empty()
};
let mut inputs: InputStream =
trace_stream!(target: "nu::trace_stream::external::it", "input" = input);
let stream = async_stream! {
while let Some(value) = inputs.next().await {
@@ -363,12 +363,10 @@ fn run_with_iterator_arg(
}
}).collect::<Vec<String>>();
match spawn(&command, &path, &process_args[..], None, is_last) {
Ok(res) => {
if let Some(mut res) = res {
while let Some(item) = res.next().await {
yield Ok(item)
}
match spawn(&command, &path, &process_args[..], InputStream::empty(), is_last) {
Ok(mut res) => {
while let Some(item) = res.next().await {
yield Ok(item)
}
}
Err(reason) => {
@@ -382,19 +380,18 @@ fn run_with_iterator_arg(
}
};
Ok(Some(stream.to_input_stream()))
Ok(stream.to_input_stream())
}
fn run_with_stdin(
command: ExternalCommand,
context: &mut Context,
input: Option<InputStream>,
input: InputStream,
is_last: bool,
) -> Result<Option<InputStream>, ShellError> {
) -> Result<InputStream, ShellError> {
let path = context.shell_manager.path();
let input = input
.map(|input| trace_stream!(target: "nu::trace_stream::external::stdin", "input" = input));
let input = trace_stream!(target: "nu::trace_stream::external::stdin", "input" = input);
let process_args = command
.args
@@ -432,9 +429,9 @@ fn spawn(
command: &ExternalCommand,
path: &str,
args: &[String],
input: Option<InputStream>,
input: InputStream,
is_last: bool,
) -> Result<Option<InputStream>, ShellError> {
) -> Result<InputStream, ShellError> {
let command = command.clone();
let mut process = {
@@ -471,7 +468,7 @@ fn spawn(
}
// open since we have some contents for stdin
if input.is_some() {
if !input.is_empty() {
process.stdin(Stdio::piped());
trace!(target: "nu::run::external", "set up stdin pipe");
}
@@ -490,7 +487,7 @@ fn spawn(
let stdout_name_tag = command.name_tag;
std::thread::spawn(move || {
if let Some(input) = input {
if !input.is_empty() {
let mut stdin_write = stdin
.take()
.expect("Internal error: could not get stdin pipe for external command");
@@ -632,7 +629,7 @@ fn spawn(
});
let stream = ThreadedReceiver::new(rx);
Ok(Some(stream.to_input_stream()))
Ok(stream.to_input_stream())
} else {
Err(ShellError::labeled_error(
"Failed to spawn process",
@@ -717,7 +714,7 @@ fn shell_os_paths() -> Vec<std::path::PathBuf> {
mod tests {
use super::{
add_quotes, argument_contains_whitespace, argument_is_quoted, expand_tilde, remove_quotes,
run_external_command, Context,
run_external_command, Context, InputStream,
};
use futures::executor::block_on;
use nu_errors::ShellError;
@@ -740,10 +737,11 @@ mod tests {
async fn non_existent_run() -> Result<(), ShellError> {
let cmd = ExternalBuilder::for_name("i_dont_exist.exe").build();
let input = InputStream::empty();
let mut ctx = Context::basic().expect("There was a problem creating a basic context.");
assert!(
run_external_command(cmd, &mut ctx, None, &Scope::empty(), false)
run_external_command(cmd, &mut ctx, input, &Scope::empty(), false)
.await
.is_err()
);

View File

@@ -10,20 +10,15 @@ use nu_protocol::{CommandAction, Primitive, ReturnSuccess, Scope, UntaggedValue,
pub(crate) fn run_internal_command(
command: InternalCommand,
context: &mut Context,
input: Option<InputStream>,
input: InputStream,
scope: &Scope,
) -> Result<Option<InputStream>, ShellError> {
) -> Result<InputStream, ShellError> {
if log_enabled!(log::Level::Trace) {
trace!(target: "nu::run::internal", "->");
trace!(target: "nu::run::internal", "{}", command.name);
}
let objects: InputStream = if let Some(input) = input {
trace_stream!(target: "nu::trace_stream::internal", "input" = input)
} else {
InputStream::empty()
};
let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input);
let internal_command = context.expect_command(&command.name);
let result = {
@@ -36,8 +31,7 @@ pub(crate) fn run_internal_command(
)
};
let result = trace_out_stream!(target: "nu::trace_stream::internal", "output" = result);
let mut result = result.values;
let mut result = trace_out_stream!(target: "nu::trace_stream::internal", "output" = result);
let mut context = context.clone();
let stream = async_stream! {
@@ -69,7 +63,8 @@ pub(crate) fn run_internal_command(
head: command.args.head,
positional: None,
named: None,
span: Span::unknown()
span: Span::unknown(),
is_last: false,
},
name_tag: Tag::unknown_anchor(command.name_span),
scope: Scope::empty(),
@@ -186,5 +181,5 @@ pub(crate) fn run_internal_command(
}
};
Ok(Some(stream.to_input_stream()))
Ok(stream.to_input_stream())
}

View File

@@ -10,9 +10,9 @@ use nu_protocol::Scope;
pub(crate) async fn run_pipeline(
pipeline: ClassifiedPipeline,
ctx: &mut Context,
mut input: Option<InputStream>,
mut input: InputStream,
scope: &Scope,
) -> Result<Option<InputStream>, ShellError> {
) -> Result<InputStream, ShellError> {
let mut iter = pipeline.commands.list.into_iter().peekable();
loop {

View File

@@ -41,7 +41,7 @@ pub mod clipboard {
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
let mut clip_stream = inner_clip(values, name).await;
while let Some(value) = clip_stream.next().await {

View File

@@ -179,7 +179,7 @@ impl CommandArgs {
args: T::deserialize(&mut deserializer)?,
context: RunnableContext {
input,
commands: registry.clone(),
registry: registry.clone(),
shell_manager,
name: name_tag,
host,
@@ -215,7 +215,7 @@ impl CommandArgs {
args: T::deserialize(&mut deserializer)?,
context: RunnableContext {
input,
commands: registry.clone(),
registry: registry.clone(),
shell_manager,
name: name_tag,
host,
@@ -238,13 +238,13 @@ pub struct RunnableContext {
pub shell_manager: ShellManager,
pub host: Arc<parking_lot::Mutex<Box<dyn Host>>>,
pub ctrl_c: Arc<AtomicBool>,
pub commands: CommandRegistry,
pub registry: CommandRegistry,
pub name: Tag,
}
impl RunnableContext {
pub fn get_command(&self, name: &str) -> Option<Arc<Command>> {
self.commands.get_command(name)
self.registry.get_command(name)
}
}
@@ -530,7 +530,6 @@ impl Command {
let out = args
.input
.values
.map(move |x| {
let call_info = UnevaluatedCallInfo {
args: raw_args.call_info.args.clone(),
@@ -597,7 +596,7 @@ impl WholeStreamCommand for FnFilterCommand {
let registry: CommandRegistry = registry.clone();
let func = self.func;
let result = input.values.map(move |it| {
let result = input.map(move |it| {
let registry = registry.clone();
let call_info = match call_info.clone().evaluate_with_new_it(&registry, &it) {
Err(err) => return OutputStream::from(vec![Err(err)]).values,

View File

@@ -39,7 +39,7 @@ pub fn compact(
CompactArgs { rest: columns }: CompactArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let objects = input.values.filter(move |item| {
let objects = input.filter(move |item| {
let keep = if columns.is_empty() {
item.is_some()
} else {

View File

@@ -124,7 +124,7 @@ pub fn config(
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(&value.tag));
}
else if let Some(v) = set_into {
let rows: Vec<Value> = input.values.collect().await;
let rows: Vec<Value> = input.collect().await;
let key = v.to_string();
if rows.len() == 0 {

View File

@@ -37,7 +37,7 @@ pub fn count(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let rows: Vec<Value> = input.values.collect().await;
let rows: Vec<Value> = input.collect().await;
yield ReturnSuccess::value(UntaggedValue::int(rows.len()).into_value(name))
};

View File

@@ -37,7 +37,6 @@ fn debug_value(
RunnableContext { input, .. }: RunnableContext,
) -> Result<impl ToOutputStream, ShellError> {
Ok(input
.values
.map(move |v| {
if raw {
ReturnSuccess::value(

View File

@@ -47,7 +47,6 @@ fn default(
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = input
.values
.map(move |item| {
let mut result = VecDeque::new();

View File

@@ -1,8 +1,10 @@
use crate::commands::classified::pipeline::run_pipeline;
use crate::commands::PerItemCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use futures::stream::once;
use nu_errors::ShellError;
use nu_protocol::{
hir::ClassifiedPipeline, CallInfo, ReturnSuccess, Scope, Signature, SyntaxShape, UntaggedValue,
@@ -46,37 +48,34 @@ impl PerItemCommand for Each {
} => {
let mut context = Context::from_raw(&raw_args, &registry);
let input_clone = input.clone();
let input_stream = async_stream! {
yield Ok(input.clone())
}.to_input_stream();
let input_stream = once(async { Ok(input) }).to_input_stream();
let result = run_pipeline(
ClassifiedPipeline::new(block.clone(), None),
&mut context,
Some(input_stream),
input_stream,
&Scope::new(input_clone),
).await;
match result {
Ok(Some(v)) => {
let results: Vec<Value> = v.collect().await;
Ok(stream) if stream.is_empty() => {
yield Err(ShellError::labeled_error(
"Expected a block",
"each needs a block",
tag,
));
}
Ok(mut stream) => {
let errors = context.get_errors();
if let Some(error) = errors.first() {
yield Err(error.clone());
return;
}
for result in results {
while let Some(result) = stream.next().await {
yield Ok(ReturnSuccess::Value(result));
}
}
Ok(None) => {
yield Err(ShellError::labeled_error(
"Expected a block",
"each needs a block",
tag,
));
}
Err(e) => {
yield Err(e);
}

View File

@@ -101,7 +101,8 @@ impl PerItemCommand for Enter {
head: raw_args.call_info.args.head,
positional: None,
named: None,
span: Span::unknown()
span: Span::unknown(),
is_last: false,
},
name_tag: raw_args.call_info.name_tag,
scope: raw_args.call_info.scope.clone()

View File

@@ -45,7 +45,7 @@ pub fn evaluate_by(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {
yield Err(ShellError::labeled_error(

View File

@@ -48,7 +48,5 @@ fn first(
1
};
Ok(OutputStream::from_input(
context.input.values.take(rows_desired),
))
Ok(OutputStream::from_input(context.input.take(rows_desired)))
}

View File

@@ -197,7 +197,6 @@ pub fn get(
let member = fields.remove(0);
trace!("get {:?} {:?}", member, fields);
let stream = input
.values
.map(move |item| {
let mut result = VecDeque::new();

View File

@@ -43,7 +43,7 @@ pub fn group_by(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {
yield Err(ShellError::labeled_error(

View File

@@ -35,7 +35,7 @@ pub fn headers(
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let rows: Vec<Value> = input.values.collect().await;
let rows: Vec<Value> = input.collect().await;
if rows.len() < 1 {
yield Err(ShellError::untagged_runtime_error("Couldn't find headers, was the input a properly formatted, non-empty table?"));

View File

@@ -53,7 +53,7 @@ pub fn histogram(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
let Tagged { item: group_by, .. } = column_name.clone();

View File

@@ -47,7 +47,7 @@ fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream,
let mut leftover_string = String::new();
let stream = async_stream! {
loop {
match input.values.next().await {
match input.next().await {
Some(Value { value: UntaggedValue::Primitive(Primitive::String(st)), ..}) => {
let mut st = leftover_string.clone() + &st;
leftover.clear();

View File

@@ -46,7 +46,7 @@ pub fn map_max_by(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {

View File

@@ -49,7 +49,6 @@ fn nth(
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = input
.values
.enumerate()
.map(move |(idx, item)| {
let row_number = vec![row_number.clone()];

View File

@@ -1,7 +1,6 @@
use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use futures_util::pin_mut;
use nu_errors::ShellError;
use nu_protocol::{
ColumnPath, PathMember, Primitive, ReturnSuccess, ReturnValue, Signature, SyntaxShape,
@@ -44,7 +43,9 @@ impl WholeStreamCommand for Pick {
fn pick(
PickArgs { rest: mut fields }: PickArgs,
RunnableContext { input, name, .. }: RunnableContext,
RunnableContext {
mut input, name, ..
}: RunnableContext,
) -> Result<OutputStream, ShellError> {
if fields.is_empty() {
return Err(ShellError::labeled_error(
@@ -64,13 +65,10 @@ fn pick(
.collect::<Vec<ColumnPath>>();
let stream = async_stream! {
let values = input.values;
pin_mut!(values);
let mut empty = true;
let mut bring_back: indexmap::IndexMap<String, Vec<Value>> = indexmap::IndexMap::new();
while let Some(value) = values.next().await {
while let Some(value) = input.next().await {
for path in &column_paths {
let path_members_span = span_for_spanned_list(path.members().iter().map(|p| p.span));

View File

@@ -98,7 +98,7 @@ pub fn filter_plugin(
trace!("filtering :: {:?}", call_info);
let stream = bos
.chain(args.input.values)
.chain(args.input)
.chain(eos)
.map(move |v| match v {
Value {
@@ -343,7 +343,7 @@ pub fn sink_plugin(
let call_info = args.call_info.clone();
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let request = JsonRpc::new("sink", (call_info.clone(), input));
let request_raw = serde_json::to_string(&request);

View File

@@ -43,5 +43,5 @@ fn prepend(
) -> Result<OutputStream, ShellError> {
let prepend = futures::stream::iter(vec![row]);
Ok(OutputStream::from_input(prepend.chain(input.values)))
Ok(prepend.chain(input).to_output_stream())
}

View File

@@ -50,7 +50,5 @@ fn range(
let from = *from as usize;
let to = *to as usize;
Ok(OutputStream::from_input(
input.values.skip(from).take(to - from + 1),
))
Ok(input.skip(from).take(to - from + 1).to_output_stream())
}

View File

@@ -45,7 +45,7 @@ pub fn reduce_by(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {
yield Err(ShellError::labeled_error(

View File

@@ -48,9 +48,7 @@ fn reject(
let fields: Vec<_> = fields.iter().map(|f| f.item.clone()).collect();
let stream = input
.values
.map(move |item| reject_fields(&item, &fields, &item.tag));
let stream = input.map(move |item| reject_fields(&item, &fields, &item.tag));
Ok(stream.from_input_stream())
}

View File

@@ -54,7 +54,6 @@ pub fn rename(
let new_column_names = new_column_names.into_iter().flatten().collect::<Vec<_>>();
let stream = input
.values
.map(move |item| {
let mut result = VecDeque::new();

View File

@@ -32,7 +32,7 @@ fn reverse(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let args = args.evaluate_once(registry)?;
let (input, _args) = args.parts();
let input = input.values.collect::<Vec<_>>();
let input = input.collect::<Vec<_>>();
let output = input.map(move |mut vec| {
vec.reverse();

View File

@@ -56,37 +56,36 @@ impl PerItemCommand for AliasCommand {
let stream = async_stream! {
let mut context = Context::from_raw(&raw_args, &registry);
let input_stream = async_stream! {
yield Ok(input.clone())
}.to_input_stream();
let input_clone = Ok(input.clone());
let input_stream = futures::stream::once(async { input_clone }).boxed().to_input_stream();
let result = run_pipeline(
ClassifiedPipeline::new(block.clone(), None),
&mut context,
Some(input_stream),
input_stream,
&scope
).await;
match result {
Ok(Some(v)) => {
let results: Vec<Value> = v.collect().await;
let errors = context.get_errors();
if let Some(error) = errors.first() {
yield Err(error.clone());
return;
}
for result in results {
yield Ok(ReturnSuccess::Value(result));
}
}
Ok(None) => {
Ok(stream) if stream.is_empty() => {
yield Err(ShellError::labeled_error(
"Expected a block",
"each needs a block",
tag,
));
}
Ok(mut stream) => {
// We collect first to ensure errors are put into the context
while let Some(result) = stream.next().await {
yield Ok(ReturnSuccess::Value(result));
}
let errors = context.get_errors();
if let Some(error) = errors.first() {
yield Err(error.clone());
}
}
Err(e) => {
yield Err(e);
}

View File

@@ -0,0 +1,128 @@
use crate::commands::classified::external;
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use derive_new::new;
use parking_lot::Mutex;
use nu_errors::ShellError;
use nu_protocol::hir::{
Expression, ExternalArg, ExternalArgs, ExternalCommand, Literal, SpannedExpression,
};
use nu_protocol::{ReturnSuccess, Scope, Signature, SyntaxShape};
#[derive(Deserialize)]
pub struct RunExternalArgs {}
#[derive(new)]
pub struct RunExternalCommand;
fn spanned_expression_to_string(expr: &SpannedExpression) -> String {
if let SpannedExpression {
expr: Expression::Literal(Literal::String(s)),
..
} = expr
{
s.clone()
} else {
"notacommand!!!".to_string()
}
}
impl WholeStreamCommand for RunExternalCommand {
fn name(&self) -> &str {
"run_external"
}
fn signature(&self) -> Signature {
Signature::build(self.name()).rest(SyntaxShape::Any, "external command arguments")
}
fn usage(&self) -> &str {
""
}
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let positionals = args.call_info.args.positional.ok_or_else(|| {
ShellError::untagged_runtime_error("positional arguments unexpectedly empty")
})?;
let mut command_args = positionals.iter();
let name = command_args
.next()
.map(spanned_expression_to_string)
.ok_or_else(|| {
ShellError::untagged_runtime_error(
"run_external unexpectedly missing external name positional arg",
)
})?;
let command = ExternalCommand {
name,
name_tag: Tag::unknown(),
args: ExternalArgs {
list: command_args
.map(|arg| ExternalArg {
arg: spanned_expression_to_string(arg),
tag: Tag::unknown(),
})
.collect(),
span: Default::default(),
},
};
let mut external_context;
#[cfg(windows)]
{
external_context = Context {
registry: registry.clone(),
host: args.host.clone(),
shell_manager: args.shell_manager.clone(),
ctrl_c: args.ctrl_c.clone(),
current_errors: Arc::new(Mutex::new(vec![])),
windows_drives_previous_cwd: Arc::new(Mutex::new(std::collections::HashMap::new())),
};
}
#[cfg(not(windows))]
{
external_context = Context {
registry: registry.clone(),
host: args.host.clone(),
shell_manager: args.shell_manager.clone(),
ctrl_c: args.ctrl_c.clone(),
current_errors: Arc::new(Mutex::new(vec![])),
};
}
let is_last = args.call_info.args.is_last;
let input = args.input;
let stream = async_stream! {
let scope = Scope::empty();
let result = external::run_external_command(
command,
&mut external_context,
input,
&scope,
is_last,
).await;
match result {
Ok(mut stream) => {
while let Some(value) = stream.next().await {
yield Ok(ReturnSuccess::Value(value));
}
},
Err(e) => {
yield Err(e);
},
_ => {}
}
};
Ok(stream.to_output_stream())
}
}

View File

@@ -168,7 +168,7 @@ fn save(
shell_manager,
host,
ctrl_c,
commands: registry,
registry,
..
}: RunnableContext,
raw_args: RawCommandArgs,
@@ -177,7 +177,7 @@ fn save(
let name_tag = name.clone();
let stream = async_stream! {
let input: Vec<Value> = input.values.collect().await;
let input: Vec<Value> = input.collect().await;
if path.is_none() {
// If there is no filename, check the metadata for the anchor filename
if input.len() > 0 {
@@ -232,7 +232,8 @@ fn save(
head: raw_args.call_info.args.head,
positional: None,
named: None,
span: Span::unknown()
span: Span::unknown(),
is_last: false,
},
name_tag: raw_args.call_info.name_tag,
scope: Scope::empty(), // FIXME?

View File

@@ -48,7 +48,7 @@ fn shuffle(
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let mut values: Vec<Value> = input.values.collect().await;
let mut values: Vec<Value> = input.collect().await;
let out = if let Some(n) = limit {
let (shuffled, _) = values.partial_shuffle(&mut thread_rng(), *n as usize);

View File

@@ -33,7 +33,6 @@ fn size(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream,
let name_span = tag.span;
Ok(input
.values
.map(move |v| {
if let Ok(s) = v.as_string() {
ReturnSuccess::value(count(&s, &v.tag))

View File

@@ -41,7 +41,5 @@ fn skip(SkipArgs { rows }: SkipArgs, context: RunnableContext) -> Result<OutputS
1
};
Ok(OutputStream::from_input(
context.input.values.skip(rows_desired),
))
Ok(OutputStream::from_input(context.input.skip(rows_desired)))
}

View File

@@ -68,7 +68,7 @@ impl WholeStreamCommand for SkipWhile {
}
};
let objects = call_info.input.values.skip_while(move |item| {
let objects = call_info.input.skip_while(move |item| {
let condition = condition.clone();
trace!("ITEM = {:?}", item);
let result = evaluate_baseline_expr(&*condition, &registry, &Scope::new(item.clone()));

View File

@@ -44,7 +44,7 @@ pub fn split_by(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
if values.len() > 1 || values.is_empty() {
yield Err(ShellError::labeled_error(

View File

@@ -57,7 +57,6 @@ fn split_column(
let name_span = name.span;
Ok(input
.values
.map(move |v| {
if let Ok(s) = v.as_string() {
let splitter = separator.replace("\\n", "\n");

View File

@@ -43,7 +43,6 @@ fn split_row(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = input
.values
.map(move |v| {
if let Ok(s) = v.as_string() {
let splitter = separator.item.replace("\\n", "\n");

View File

@@ -27,7 +27,7 @@ impl WholeStreamCommand for Sum {
) -> Result<OutputStream, ShellError> {
sum(RunnableContext {
input: args.input,
commands: registry.clone(),
registry: registry.clone(),
shell_manager: args.shell_manager,
host: args.host,
ctrl_c: args.ctrl_c,

View File

@@ -69,7 +69,7 @@ fn t_sort_by(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(OutputStream::new(async_stream! {
let values: Vec<Value> = input.values.collect().await;
let values: Vec<Value> = input.collect().await;
let column_grouped_by_name = if let Some(grouped_by) = group_by {
Some(grouped_by.item().clone())

View File

@@ -30,7 +30,6 @@ impl WholeStreamCommand for Tags {
fn tags(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
Ok(args
.input
.values
.map(move |v| {
let mut tags = TaggedDictBuilder::new(v.tag());
{

View File

@@ -266,7 +266,7 @@ fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {
let tag = input[0].tag.clone();

View File

@@ -175,7 +175,7 @@ pub fn to_delimited_data(
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = input.values.collect().await;
let input: Vec<Value> = input.collect().await;
let to_process_input = if input.len() > 1 {
let tag = input[0].tag.clone();

View File

@@ -35,7 +35,7 @@ fn to_html(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let name_tag = args.name_tag();
//let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let headers = nu_protocol::merge_descriptors(&input);
let mut output_string = "<html><body>".to_string();

View File

@@ -136,7 +136,7 @@ fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let name_tag = args.name_tag();
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {
let tag = input[0].tag.clone();

View File

@@ -34,7 +34,7 @@ fn to_html(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let name_tag = args.name_tag();
//let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let headers = nu_protocol::merge_descriptors(&input);
let mut output_string = String::new();

View File

@@ -205,7 +205,7 @@ fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
match sqlite_input_stream_to_bytes(input) {
Ok(out) => yield ReturnSuccess::value(out),

View File

@@ -98,7 +98,7 @@ fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let name_tag = args.name_tag();
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {
let tag = input[0].tag.clone();

View File

@@ -33,7 +33,7 @@ fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream,
let input = args.input;
let stream = async_stream! {
let input: Vec<Value> = input.values.collect().await;
let input: Vec<Value> = input.collect().await;
for value in input {
match value {

View File

@@ -130,7 +130,7 @@ fn to_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {
let tag = input[0].tag.clone();

View File

@@ -29,10 +29,8 @@ impl WholeStreamCommand for Trim {
}
fn trim(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let input = args.input;
Ok(input
.values
Ok(args
.input
.map(move |v| {
let string = String::extract(&v)?;
ReturnSuccess::value(UntaggedValue::string(string.trim()).into_value(v.tag()))

View File

@@ -37,7 +37,7 @@ fn uniq(
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let uniq_values: IndexSet<_> = input.values.collect().await;
let uniq_values: IndexSet<_> = input.collect().await;
for item in uniq_values.iter().map(|row| ReturnSuccess::value(row.clone())) {
yield item;

View File

@@ -1,8 +1,6 @@
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use futures::StreamExt;
use futures_util::pin_mut;
use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, ReturnValue, Signature, UntaggedValue};
@@ -34,14 +32,11 @@ impl WholeStreamCommand for What {
}
pub fn what(
WhatArgs {}: WhatArgs,
RunnableContext { input, .. }: RunnableContext,
_: WhatArgs,
RunnableContext { mut input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values = input.values;
pin_mut!(values);
while let Some(row) = values.next().await {
while let Some(row) = input.next().await {
let name = value::format_type(&row, 100);
yield ReturnSuccess::value(UntaggedValue::string(name).into_value(Tag::unknown_anchor(row.tag.span)));
}

View File

@@ -78,7 +78,7 @@ struct WhichArgs {
fn which(
WhichArgs { application, all }: WhichArgs,
RunnableContext { commands, .. }: RunnableContext,
RunnableContext { registry, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let external = application.starts_with('^');
let item = if external {
@@ -89,7 +89,7 @@ fn which(
let stream = async_stream! {
if !external {
let builtin = commands.has(&item);
let builtin = registry.has(&item);
if builtin {
yield ReturnSuccess::value(entry_builtin!(item, application.tag.clone()));
}