Split OutputStream into ActionStream/OutputStream (#3304)

* Split OutputStream into ActionStream/OutputStream

* Fmt

* Missed update

* Cleanup helper names

* Fmt
This commit is contained in:
Jonathan Turner
2021-04-12 14:35:01 +12:00
committed by GitHub
parent dbecbdccd4
commit 5f550a355b
250 changed files with 1006 additions and 926 deletions

View File

@ -8,16 +8,16 @@ use nu_protocol::hir::{
};
use nu_protocol::{ReturnSuccess, UntaggedValue, Value};
use nu_source::{Span, Tag};
use nu_stream::InputStream;
use nu_stream::ToOutputStream;
use nu_stream::ToActionStream;
use nu_stream::{InputStream, OutputStream};
use std::sync::atomic::Ordering;
pub fn run_block(
block: &Block,
ctx: &EvaluationContext,
mut input: InputStream,
) -> Result<InputStream, ShellError> {
let mut output: Result<InputStream, ShellError> = Ok(InputStream::empty());
) -> Result<OutputStream, ShellError> {
let mut output: Result<InputStream, ShellError> = Ok(OutputStream::empty());
for (_, definition) in block.definitions.iter() {
ctx.scope.add_definition(definition.clone());
}
@ -47,13 +47,13 @@ pub fn run_block(
}
};
match output_stream.next() {
Some(Ok(ReturnSuccess::Value(Value {
Some(Value {
value: UntaggedValue::Error(e),
..
}))) => {
}) => {
return Err(e);
}
Some(Ok(_item)) => {
Some(_item) => {
if let Some(err) = ctx.get_errors().get(0) {
ctx.clear_errors();
return Err(err.clone());
@ -68,9 +68,6 @@ pub fn run_block(
return Err(err.clone());
}
}
Some(Err(e)) => {
return Err(e);
}
}
}
}
@ -78,12 +75,12 @@ pub fn run_block(
return Err(e);
}
}
output = Ok(InputStream::empty());
output = Ok(OutputStream::empty());
for pipeline in &group.pipelines {
match output {
Ok(inp) if inp.is_empty() => {}
Ok(inp) => {
let mut output_stream = inp.to_output_stream();
let mut output_stream = inp.to_action_stream();
match output_stream.next() {
Some(Ok(ReturnSuccess::Value(Value {
@ -123,7 +120,7 @@ pub fn run_block(
}
output = run_pipeline(pipeline, ctx, input);
input = InputStream::empty();
input = OutputStream::empty();
}
}
@ -134,7 +131,7 @@ fn run_pipeline(
commands: &Pipeline,
ctx: &EvaluationContext,
mut input: InputStream,
) -> Result<InputStream, ShellError> {
) -> Result<OutputStream, ShellError> {
for item in commands.list.clone() {
input = match item {
ClassifiedCommand::Dynamic(call) => {

View File

@ -5,10 +5,12 @@ use crate::filesystem::filesystem_shell::{FilesystemShell, FilesystemShellMode};
use crate::shell::value_shell::ValueShell;
use log::{log_enabled, trace};
use nu_errors::ShellError;
use nu_protocol::hir::{ExternalRedirection, InternalCommand};
use nu_protocol::hir::{
Expression, ExternalRedirection, InternalCommand, SpannedExpression, Synthetic,
};
use nu_protocol::{CommandAction, ReturnSuccess, UntaggedValue, Value};
use nu_source::{PrettyDebug, Span, Tag};
use nu_stream::{InputStream, OutputStream};
use nu_stream::{ActionStream, InputStream};
pub(crate) fn run_internal_command(
command: InternalCommand,
@ -23,39 +25,52 @@ pub(crate) fn run_internal_command(
let objects: InputStream = input;
let internal_command = context.scope.expect_command(&command.name);
let internal_command = internal_command?;
let result = {
context.run_command(
internal_command?,
Tag::unknown_anchor(command.name_span),
command.args.clone(),
objects,
)?
};
Ok(InputStream::from_stream(InternalIterator {
command,
let result = context.run_command(
internal_command,
Tag::unknown_anchor(command.name_span),
command.args,
objects,
)?;
Ok(InputStream::from_stream(InternalIteratorSimple {
context: context.clone(),
leftovers: vec![],
input: result,
}))
}
struct InternalIterator {
struct InternalIteratorSimple {
context: EvaluationContext,
command: InternalCommand,
leftovers: Vec<Value>,
input: OutputStream,
input: InputStream,
}
impl Iterator for InternalIteratorSimple {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
match self.input.next() {
Some(Value {
value: UntaggedValue::Error(err),
..
}) => {
self.context.error(err);
None
}
x => x,
}
}
}
pub struct InternalIterator {
pub context: EvaluationContext,
pub leftovers: Vec<Value>,
pub input: ActionStream,
}
impl Iterator for InternalIterator {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
// let head = Arc::new(command.args.head.clone());
// let context = context.clone();
// let command = Arc::new(command);
if !self.leftovers.is_empty() {
let output = self.leftovers.remove(0);
return Some(output);
@ -84,17 +99,23 @@ impl Iterator for InternalIterator {
shell_manager: self.context.shell_manager.clone(),
call_info: UnevaluatedCallInfo {
args: nu_protocol::hir::Call {
head: self.command.args.head.clone(),
head: Box::new(SpannedExpression {
expr: Expression::Synthetic(Synthetic::String(
command_name.clone(),
)),
span: tagged_contents.tag().span,
}),
positional: None,
named: None,
span: Span::unknown(),
external_redirection: ExternalRedirection::Stdout,
},
name_tag: Tag::unknown_anchor(self.command.name_span),
name_tag: tagged_contents.tag(),
},
scope: self.context.scope.clone(),
};
let result = converter.run(new_args.with_input(vec![tagged_contents]));
let result = converter
.run_with_actions(new_args.with_input(vec![tagged_contents]));
match result {
Ok(mut result) => {

View File

@ -9,7 +9,7 @@ use nu_data::config::{self, Conf, NuConfig};
use nu_errors::ShellError;
use nu_protocol::{hir, ConfigPath};
use nu_source::{Span, Tag};
use nu_stream::{InputStream, OutputStream};
use nu_stream::InputStream;
use parking_lot::Mutex;
use std::sync::atomic::AtomicBool;
use std::{path::Path, sync::Arc};
@ -113,7 +113,7 @@ impl EvaluationContext {
name_tag: Tag,
args: hir::Call,
input: InputStream,
) -> Result<OutputStream, ShellError> {
) -> Result<InputStream, ShellError> {
let command_args = self.command_args(args, input, name_tag);
command.run(command_args)
}

View File

@ -9,7 +9,7 @@ use encoding_rs::Encoding;
use nu_data::config::LocalConfigDiff;
use nu_protocol::{CommandAction, ConfigPath, TaggedDictBuilder, Value};
use nu_source::{Span, Tag};
use nu_stream::{Interruptible, OutputStream, ToOutputStream};
use nu_stream::{ActionStream, Interruptible, ToActionStream};
use std::collections::VecDeque;
use std::io::{Error, ErrorKind};
use std::path::{Path, PathBuf};
@ -118,7 +118,7 @@ impl Shell for FilesystemShell {
}: LsArgs,
name_tag: Tag,
ctrl_c: Arc<AtomicBool>,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let ctrl_c_copy = ctrl_c.clone();
let (path, p_tag) = match path {
Some(p) => {
@ -146,7 +146,7 @@ impl Shell for FilesystemShell {
));
}
if is_empty_dir(&p) {
return Ok(OutputStream::empty());
return Ok(ActionStream::empty());
}
p.push("*");
}
@ -154,7 +154,7 @@ impl Shell for FilesystemShell {
}
None => {
if is_empty_dir(&self.path()) {
return Ok(OutputStream::empty());
return Ok(ActionStream::empty());
} else {
(PathBuf::from("./*"), name_tag.clone())
}
@ -222,10 +222,10 @@ impl Shell for FilesystemShell {
Some(entry)
})
.interruptible(ctrl_c_copy)
.to_output_stream())
.to_action_stream())
}
fn cd(&self, args: CdArgs, name: Tag) -> Result<OutputStream, ShellError> {
fn cd(&self, args: CdArgs, name: Tag) -> Result<ActionStream, ShellError> {
let path = match args.path {
None => match homedir_if_possible() {
Some(o) => o,
@ -342,7 +342,7 @@ impl Shell for FilesystemShell {
}: CopyArgs,
name: Tag,
path: &str,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let name_tag = name;
let path = Path::new(path);
@ -467,7 +467,7 @@ impl Shell for FilesystemShell {
}
}
Ok(OutputStream::empty())
Ok(ActionStream::empty())
}
fn mkdir(
@ -478,7 +478,7 @@ impl Shell for FilesystemShell {
}: MkdirArgs,
name: Tag,
path: &str,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let path = Path::new(path);
let mut stream = VecDeque::new();
@ -515,7 +515,7 @@ impl Shell for FilesystemShell {
MvArgs { src, dst }: MvArgs,
_name: Tag,
path: &str,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let path = Path::new(path);
let source = path.join(&src.item);
let destination = path.join(&dst.item);
@ -583,7 +583,7 @@ impl Shell for FilesystemShell {
}
}
Ok(OutputStream::empty())
Ok(ActionStream::empty())
}
fn rm(
@ -597,7 +597,7 @@ impl Shell for FilesystemShell {
}: RemoveArgs,
name: Tag,
path: &str,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let rm_always_trash = nu_data::config::config(Tag::unknown())?
.get("rm_always_trash")
.map(|val| val.is_true())
@ -758,14 +758,14 @@ impl Shell for FilesystemShell {
))
}
})
.to_output_stream())
.to_action_stream())
}
fn path(&self) -> String {
self.path.clone()
}
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<ActionStream, ShellError> {
let path = PathBuf::from(self.path());
let p = match dunce::canonicalize(path.as_path()) {
Ok(p) => p,
@ -778,7 +778,7 @@ impl Shell for FilesystemShell {
}
};
Ok(OutputStream::one(ReturnSuccess::value(
Ok(ActionStream::one(ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(p.to_string_lossy().to_string()))
.into_value(&args.call_info.name_tag),
)))
@ -860,9 +860,9 @@ impl Shell for FilesystemShell {
full_path: &Path,
save_data: &[u8],
name: Span,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
match std::fs::write(full_path, save_data) {
Ok(_) => Ok(OutputStream::empty()),
Ok(_) => Ok(ActionStream::empty()),
Err(e) => Err(ShellError::labeled_error(
e.to_string(),
"IO error while saving",

View File

@ -6,7 +6,7 @@ use log::trace;
use nu_errors::ShellError;
use nu_plugin::jsonrpc::JsonRpc;
use nu_protocol::{Primitive, ReturnValue, Signature, UntaggedValue, Value};
use nu_stream::{OutputStream, ToOutputStream};
use nu_stream::{ActionStream, ToActionStream};
use serde::{self, Deserialize, Serialize};
use std::collections::VecDeque;
use std::io::prelude::*;
@ -106,12 +106,12 @@ impl WholeStreamCommand for PluginFilter {
&self.config.usage
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
fn run_with_actions(&self, args: CommandArgs) -> Result<ActionStream, ShellError> {
run_filter(self.path.clone(), args)
}
}
fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellError> {
fn run_filter(path: String, args: CommandArgs) -> Result<ActionStream, ShellError> {
trace!("filter_plugin :: {}", path);
let bos = vec![UntaggedValue::Primitive(Primitive::BeginningOfStream).into_untagged_value()]
@ -175,7 +175,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match request_raw {
Err(_) => {
return OutputStream::one(Err(ShellError::labeled_error(
return ActionStream::one(Err(ShellError::labeled_error(
"Could not load json from plugin",
"could not load json from plugin",
&call_info.name_tag,
@ -185,7 +185,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match stdin.write(format!("{}\n", request_raw).as_bytes()) {
Ok(_) => {}
Err(err) => {
return OutputStream::one(Err(ShellError::unexpected(
return ActionStream::one(Err(ShellError::unexpected(
format!("{}", err),
)));
}
@ -201,13 +201,13 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params.into_iter().to_output_stream(),
Ok(params) => params.into_iter().to_action_stream(),
Err(e) => {
vec![ReturnValue::Err(e)].into_iter().to_output_stream()
vec![ReturnValue::Err(e)].into_iter().to_action_stream()
}
},
Err(e) => OutputStream::one(Err(
Err(e) => ActionStream::one(Err(
ShellError::untagged_runtime_error(format!(
"Error while processing begin_filter response: {:?} {}",
e, input
@ -215,7 +215,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
)),
}
}
Err(e) => OutputStream::one(Err(ShellError::untagged_runtime_error(
Err(e) => ActionStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while reading begin_filter response: {:?}", e),
))),
}
@ -236,7 +236,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match request_raw {
Err(_) => {
return OutputStream::one(Err(ShellError::labeled_error(
return ActionStream::one(Err(ShellError::labeled_error(
"Could not load json from plugin",
"could not load json from plugin",
&call_info.name_tag,
@ -246,7 +246,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match stdin.write(format!("{}\n", request_raw).as_bytes()) {
Ok(_) => {}
Err(err) => {
return OutputStream::one(Err(ShellError::unexpected(
return ActionStream::one(Err(ShellError::unexpected(
format!("{}", err),
)));
}
@ -262,9 +262,9 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params.into_iter().to_output_stream(),
Ok(params) => params.into_iter().to_action_stream(),
Err(e) => {
vec![ReturnValue::Err(e)].into_iter().to_output_stream()
vec![ReturnValue::Err(e)].into_iter().to_action_stream()
}
},
Err(e) => vec![Err(ShellError::untagged_runtime_error(format!(
@ -272,7 +272,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
e, input
)))]
.into_iter()
.to_output_stream(),
.to_action_stream(),
}
}
Err(e) => vec![Err(ShellError::untagged_runtime_error(format!(
@ -280,7 +280,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
e
)))]
.into_iter()
.to_output_stream(),
.to_action_stream(),
};
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
@ -295,7 +295,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
// TODO: Handle error
}
Err(e) => {
return OutputStream::one(Err(ShellError::untagged_runtime_error(
return ActionStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while processing quit response: {:?}", e),
)));
}
@ -322,7 +322,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
// TODO: Handle error
}
Err(e) => {
return OutputStream::one(Err(ShellError::untagged_runtime_error(
return ActionStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while processing filter response: {:?}", e),
)));
}
@ -336,12 +336,12 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params.into_iter().to_output_stream(),
Ok(params) => params.into_iter().to_action_stream(),
Err(e) => {
vec![ReturnValue::Err(e)].into_iter().to_output_stream()
vec![ReturnValue::Err(e)].into_iter().to_action_stream()
}
},
Err(e) => OutputStream::one(Err(
Err(e) => ActionStream::one(Err(
ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}\n== input ==\n{}",
e, input
@ -349,7 +349,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
)),
}
}
Err(e) => OutputStream::one(Err(ShellError::untagged_runtime_error(
Err(e) => ActionStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while reading filter response: {:?}", e),
))),
}
@ -357,7 +357,7 @@ fn run_filter(path: String, args: CommandArgs) -> Result<OutputStream, ShellErro
}
})
.flatten()
.to_output_stream())
.to_action_stream())
}
#[derive(new)]
@ -380,12 +380,12 @@ impl WholeStreamCommand for PluginSink {
&self.config.usage
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
fn run_with_actions(&self, args: CommandArgs) -> Result<ActionStream, ShellError> {
run_sink(self.path.clone(), args)
}
}
fn run_sink(path: String, args: CommandArgs) -> Result<OutputStream, ShellError> {
fn run_sink(path: String, args: CommandArgs) -> Result<ActionStream, ShellError> {
let args = args.evaluate_once()?;
let call_info = args.call_info.clone();
@ -429,7 +429,7 @@ fn run_sink(path: String, args: CommandArgs) -> Result<OutputStream, ShellError>
if let Ok(mut child) = child {
let _ = child.wait();
Ok(OutputStream::empty())
Ok(ActionStream::empty())
} else {
Err(ShellError::untagged_runtime_error(
"Could not create process for sink command",

View File

@ -5,7 +5,7 @@ use nu_protocol::hir::{
Call, ClassifiedCommand, Expression, InternalCommand, Literal, NamedArguments,
SpannedExpression,
};
use nu_protocol::{Primitive, ReturnSuccess, UntaggedValue, Value};
use nu_protocol::{Primitive, UntaggedValue, Value};
use nu_stream::{InputStream, ToInputStream};
use crate::EvaluationContext;
@ -210,17 +210,16 @@ pub fn process_script(
) {
loop {
match output_stream.next() {
Some(Ok(ReturnSuccess::Value(Value {
Some(Value {
value: UntaggedValue::Error(e),
..
}))) => return LineResult::Error(line.to_string(), e),
Some(Ok(_item)) => {
}) => return LineResult::Error(line.to_string(), e),
Some(_item) => {
if ctx.ctrl_c.load(Ordering::SeqCst) {
break;
}
}
None => break,
Some(Err(e)) => return LineResult::Error(line.to_string(), e),
}
}
}

View File

@ -1,4 +1,4 @@
use nu_stream::OutputStream;
use nu_stream::ActionStream;
use crate::command_args::EvaluatedWholeStreamCommandArgs;
use crate::maybe_text_codec::StringOrBinary;
@ -26,14 +26,14 @@ pub trait Shell: std::fmt::Debug {
args: LsArgs,
name: Tag,
ctrl_c: Arc<AtomicBool>,
) -> Result<OutputStream, ShellError>;
fn cd(&self, args: CdArgs, name: Tag) -> Result<OutputStream, ShellError>;
fn cp(&self, args: CopyArgs, name: Tag, path: &str) -> Result<OutputStream, ShellError>;
fn mkdir(&self, args: MkdirArgs, name: Tag, path: &str) -> Result<OutputStream, ShellError>;
fn mv(&self, args: MvArgs, name: Tag, path: &str) -> Result<OutputStream, ShellError>;
fn rm(&self, args: RemoveArgs, name: Tag, path: &str) -> Result<OutputStream, ShellError>;
) -> Result<ActionStream, ShellError>;
fn cd(&self, args: CdArgs, name: Tag) -> Result<ActionStream, ShellError>;
fn cp(&self, args: CopyArgs, name: Tag, path: &str) -> Result<ActionStream, ShellError>;
fn mkdir(&self, args: MkdirArgs, name: Tag, path: &str) -> Result<ActionStream, ShellError>;
fn mv(&self, args: MvArgs, name: Tag, path: &str) -> Result<ActionStream, ShellError>;
fn rm(&self, args: RemoveArgs, name: Tag, path: &str) -> Result<ActionStream, ShellError>;
fn path(&self) -> String;
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError>;
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<ActionStream, ShellError>;
fn set_path(&mut self, path: String);
fn open(
&self,
@ -49,5 +49,5 @@ pub trait Shell: std::fmt::Debug {
path: &Path,
contents: &[u8],
name: Span,
) -> Result<OutputStream, ShellError>;
) -> Result<ActionStream, ShellError>;
}

View File

@ -1,7 +1,7 @@
use crate::shell::Shell;
use crate::{command_args::EvaluatedWholeStreamCommandArgs, FilesystemShell};
use crate::{filesystem::filesystem_shell::FilesystemShellMode, maybe_text_codec::StringOrBinary};
use nu_stream::OutputStream;
use nu_stream::ActionStream;
use crate::shell::shell_args::{CdArgs, CopyArgs, LsArgs, MkdirArgs, MvArgs, RemoveArgs};
use encoding_rs::Encoding;
@ -69,7 +69,7 @@ impl ShellManager {
self.shells.lock()[self.current_shell()].path()
}
pub fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
pub fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<ActionStream, ShellError> {
let env = self.shells.lock();
env[self.current_shell()].pwd(args)
@ -96,7 +96,7 @@ impl ShellManager {
full_path: &Path,
save_data: &[u8],
name: Span,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
self.shells.lock()[self.current_shell()].save(full_path, save_data, name)
}
@ -137,40 +137,40 @@ impl ShellManager {
args: LsArgs,
name: Tag,
ctrl_c: Arc<AtomicBool>,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let env = self.shells.lock();
env[self.current_shell()].ls(args, name, ctrl_c)
}
pub fn cd(&self, args: CdArgs, name: Tag) -> Result<OutputStream, ShellError> {
pub fn cd(&self, args: CdArgs, name: Tag) -> Result<ActionStream, ShellError> {
let env = self.shells.lock();
env[self.current_shell()].cd(args, name)
}
pub fn cp(&self, args: CopyArgs, name: Tag) -> Result<OutputStream, ShellError> {
pub fn cp(&self, args: CopyArgs, name: Tag) -> Result<ActionStream, ShellError> {
let shells = self.shells.lock();
let path = shells[self.current_shell()].path();
shells[self.current_shell()].cp(args, name, &path)
}
pub fn rm(&self, args: RemoveArgs, name: Tag) -> Result<OutputStream, ShellError> {
pub fn rm(&self, args: RemoveArgs, name: Tag) -> Result<ActionStream, ShellError> {
let shells = self.shells.lock();
let path = shells[self.current_shell()].path();
shells[self.current_shell()].rm(args, name, &path)
}
pub fn mkdir(&self, args: MkdirArgs, name: Tag) -> Result<OutputStream, ShellError> {
pub fn mkdir(&self, args: MkdirArgs, name: Tag) -> Result<ActionStream, ShellError> {
let shells = self.shells.lock();
let path = shells[self.current_shell()].path();
shells[self.current_shell()].mkdir(args, name, &path)
}
pub fn mv(&self, args: MvArgs, name: Tag) -> Result<OutputStream, ShellError> {
pub fn mv(&self, args: MvArgs, name: Tag) -> Result<ActionStream, ShellError> {
let shells = self.shells.lock();
let path = shells[self.current_shell()].path();

View File

@ -8,7 +8,7 @@ use nu_protocol::ValueStructure;
use nu_protocol::{ReturnSuccess, ShellTypeName, UntaggedValue, Value};
use nu_source::SpannedItem;
use nu_source::{Span, Tag, Tagged};
use nu_stream::OutputStream;
use nu_stream::ActionStream;
use nu_value_ext::ValueExt;
use std::collections::VecDeque;
use std::ffi::OsStr;
@ -99,7 +99,7 @@ impl Shell for ValueShell {
LsArgs { path, .. }: LsArgs,
name_tag: Tag,
_ctrl_c: Arc<AtomicBool>,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
let mut full_path = PathBuf::from(self.path());
if let Some(value) = &path {
@ -133,7 +133,7 @@ impl Shell for ValueShell {
Ok(output.into())
}
fn cd(&self, args: CdArgs, name: Tag) -> Result<OutputStream, ShellError> {
fn cd(&self, args: CdArgs, name: Tag) -> Result<ActionStream, ShellError> {
let destination = args.path;
let path = match destination {
@ -178,10 +178,10 @@ impl Shell for ValueShell {
));
}
Ok(OutputStream::one(ReturnSuccess::change_cwd(path)))
Ok(ActionStream::one(ReturnSuccess::change_cwd(path)))
}
fn cp(&self, _args: CopyArgs, name: Tag, _path: &str) -> Result<OutputStream, ShellError> {
fn cp(&self, _args: CopyArgs, name: Tag, _path: &str) -> Result<ActionStream, ShellError> {
Err(ShellError::labeled_error(
"cp not currently supported on values",
"not currently supported",
@ -189,7 +189,7 @@ impl Shell for ValueShell {
))
}
fn mv(&self, _args: MvArgs, name: Tag, _path: &str) -> Result<OutputStream, ShellError> {
fn mv(&self, _args: MvArgs, name: Tag, _path: &str) -> Result<ActionStream, ShellError> {
Err(ShellError::labeled_error(
"mv not currently supported on values",
"not currently supported",
@ -197,7 +197,7 @@ impl Shell for ValueShell {
))
}
fn mkdir(&self, _args: MkdirArgs, name: Tag, _path: &str) -> Result<OutputStream, ShellError> {
fn mkdir(&self, _args: MkdirArgs, name: Tag, _path: &str) -> Result<ActionStream, ShellError> {
Err(ShellError::labeled_error(
"mkdir not currently supported on values",
"not currently supported",
@ -205,7 +205,7 @@ impl Shell for ValueShell {
))
}
fn rm(&self, _args: RemoveArgs, name: Tag, _path: &str) -> Result<OutputStream, ShellError> {
fn rm(&self, _args: RemoveArgs, name: Tag, _path: &str) -> Result<ActionStream, ShellError> {
Err(ShellError::labeled_error(
"rm not currently supported on values",
"not currently supported",
@ -217,8 +217,8 @@ impl Shell for ValueShell {
self.path.clone()
}
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
Ok(OutputStream::one(
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<ActionStream, ShellError> {
Ok(ActionStream::one(
UntaggedValue::string(self.path()).into_value(&args.call_info.name_tag),
))
}
@ -247,7 +247,7 @@ impl Shell for ValueShell {
_path: &Path,
_contents: &[u8],
_name: Span,
) -> Result<OutputStream, ShellError> {
) -> Result<ActionStream, ShellError> {
Err(ShellError::unimplemented(
"save on help shell is not supported",
))

View File

@ -8,7 +8,7 @@ use nu_parser::ParserScope;
use nu_protocol::hir::Block;
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue};
use nu_source::{DbgDocBldr, DebugDocBuilder, PrettyDebugWithSource, Span, Tag};
use nu_stream::{OutputStream, ToOutputStream};
use nu_stream::{ActionStream, InputStream, OutputStream, ToOutputStream};
use std::sync::Arc;
pub trait WholeStreamCommand: Send + Sync {
@ -24,7 +24,24 @@ pub trait WholeStreamCommand: Send + Sync {
""
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError>;
fn run_with_actions(&self, _args: CommandArgs) -> Result<ActionStream, ShellError> {
return Err(ShellError::unimplemented(&format!(
"{} does not implement run or run_with_actions",
self.name()
)));
}
fn run(&self, args: CommandArgs) -> Result<InputStream, ShellError> {
let context = EvaluationContext::from_args(&args);
let stream = self.run_with_actions(args)?;
Ok(Box::new(crate::evaluate::internal::InternalIterator {
context,
input: stream,
leftovers: vec![],
})
.to_output_stream())
}
fn is_binary(&self) -> bool {
false
@ -156,7 +173,7 @@ impl WholeStreamCommand for Arc<Block> {
}
let result = run_block(&block, &ctx, input);
ctx.scope.exit_scope();
result.map(|x| x.to_output_stream())
result
}
fn is_binary(&self) -> bool {
@ -211,12 +228,23 @@ impl Command {
self.0.examples()
}
pub fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
pub fn run_with_actions(&self, args: CommandArgs) -> Result<ActionStream, ShellError> {
if args.call_info.switch_present("help") {
let cl = self.0.clone();
Ok(OutputStream::one(Ok(ReturnSuccess::Value(
Ok(ActionStream::one(Ok(ReturnSuccess::Value(
UntaggedValue::string(get_full_help(&*cl, &args.scope)).into_value(Tag::unknown()),
))))
} else {
self.0.run_with_actions(args)
}
}
pub fn run(&self, args: CommandArgs) -> Result<InputStream, ShellError> {
if args.call_info.switch_present("help") {
let cl = self.0.clone();
Ok(InputStream::one(
UntaggedValue::string(get_full_help(&*cl, &args.scope)).into_value(Tag::unknown()),
))
} else {
self.0.run(args)
}