forked from extern/nushell
Merge pull request #1008 from thegedge/move-pipeline-to-classified
Move pipeline code from cli to classified
This commit is contained in:
commit
70d63e34e9
108
src/cli.rs
108
src/cli.rs
@ -1,6 +1,5 @@
|
|||||||
use crate::commands::classified::{
|
use crate::commands::classified::{
|
||||||
ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand,
|
ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand,
|
||||||
StreamNext,
|
|
||||||
};
|
};
|
||||||
use crate::commands::plugin::JsonRpc;
|
use crate::commands::plugin::JsonRpc;
|
||||||
use crate::commands::plugin::{PluginCommand, PluginSink};
|
use crate::commands::plugin::{PluginCommand, PluginSink};
|
||||||
@ -600,114 +599,17 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
|
|||||||
})),
|
})),
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut input = ClassifiedInputStream::new();
|
|
||||||
let mut iter = pipeline.commands.item.into_iter().peekable();
|
|
||||||
|
|
||||||
// Check the config to see if we need to update the path
|
// Check the config to see if we need to update the path
|
||||||
// TODO: make sure config is cached so we don't path this load every call
|
// TODO: make sure config is cached so we don't path this load every call
|
||||||
set_env_from_config();
|
set_env_from_config();
|
||||||
|
|
||||||
loop {
|
let input = ClassifiedInputStream::new();
|
||||||
let item: Option<ClassifiedCommand> = iter.next();
|
match pipeline.run(ctx, input, line).await {
|
||||||
let next: Option<&ClassifiedCommand> = iter.peek();
|
Ok(_) => LineResult::Success(line.to_string()),
|
||||||
|
Err(err) => LineResult::Error(line.to_string(), err),
|
||||||
input = match (item, next) {
|
|
||||||
(None, _) => break,
|
|
||||||
|
|
||||||
(Some(ClassifiedCommand::Dynamic(_)), _)
|
|
||||||
| (_, Some(ClassifiedCommand::Dynamic(_))) => {
|
|
||||||
return LineResult::Error(
|
|
||||||
line.to_string(),
|
|
||||||
ShellError::unimplemented("Dynamic commands"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
(Some(ClassifiedCommand::Expr(_)), _) => {
|
|
||||||
return LineResult::Error(
|
|
||||||
line.to_string(),
|
|
||||||
ShellError::unimplemented("Expression-only commands"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
(_, Some(ClassifiedCommand::Expr(_))) => {
|
|
||||||
return LineResult::Error(
|
|
||||||
line.to_string(),
|
|
||||||
ShellError::unimplemented("Expression-only commands"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
(
|
|
||||||
Some(ClassifiedCommand::Internal(left)),
|
|
||||||
Some(ClassifiedCommand::External(_)),
|
|
||||||
) => match left.run(ctx, input, Text::from(line)) {
|
|
||||||
Ok(val) => ClassifiedInputStream::from_input_stream(val),
|
|
||||||
Err(err) => return LineResult::Error(line.to_string(), err),
|
|
||||||
},
|
|
||||||
|
|
||||||
(Some(ClassifiedCommand::Internal(left)), Some(_)) => {
|
|
||||||
match left.run(ctx, input, Text::from(line)) {
|
|
||||||
Ok(val) => ClassifiedInputStream::from_input_stream(val),
|
|
||||||
Err(err) => return LineResult::Error(line.to_string(), err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(Some(ClassifiedCommand::Internal(left)), None) => {
|
|
||||||
match left.run(ctx, input, Text::from(line)) {
|
|
||||||
Ok(val) => {
|
|
||||||
use futures::stream::TryStreamExt;
|
|
||||||
|
|
||||||
let mut output_stream: OutputStream = val.into();
|
|
||||||
loop {
|
|
||||||
match output_stream.try_next().await {
|
|
||||||
Ok(Some(ReturnSuccess::Value(Tagged {
|
|
||||||
item: Value::Error(e),
|
|
||||||
..
|
|
||||||
}))) => {
|
|
||||||
return LineResult::Error(line.to_string(), e);
|
|
||||||
}
|
|
||||||
Ok(Some(_item)) => {
|
|
||||||
if ctx.ctrl_c.load(Ordering::SeqCst) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return LineResult::Success(line.to_string());
|
|
||||||
}
|
|
||||||
Err(err) => return LineResult::Error(line.to_string(), err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(
|
|
||||||
Some(ClassifiedCommand::External(left)),
|
|
||||||
Some(ClassifiedCommand::External(_)),
|
|
||||||
) => match left.run(ctx, input, StreamNext::External).await {
|
|
||||||
Ok(val) => val,
|
|
||||||
Err(err) => return LineResult::Error(line.to_string(), err),
|
|
||||||
},
|
|
||||||
|
|
||||||
(Some(ClassifiedCommand::External(left)), Some(_)) => {
|
|
||||||
match left.run(ctx, input, StreamNext::Internal).await {
|
|
||||||
Ok(val) => val,
|
|
||||||
Err(err) => return LineResult::Error(line.to_string(), err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(Some(ClassifiedCommand::External(left)), None) => {
|
|
||||||
match left.run(ctx, input, StreamNext::Last).await {
|
|
||||||
Ok(val) => val,
|
|
||||||
Err(err) => return LineResult::Error(line.to_string(), err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LineResult::Success(line.to_string())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(ReadlineError::Interrupted) => LineResult::CtrlC,
|
Err(ReadlineError::Interrupted) => LineResult::CtrlC,
|
||||||
Err(ReadlineError::Eof) => LineResult::Break,
|
Err(ReadlineError::Eof) => LineResult::Break,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -1,453 +0,0 @@
|
|||||||
use crate::parser::{hir, TokenNode};
|
|
||||||
use crate::prelude::*;
|
|
||||||
use bytes::{BufMut, BytesMut};
|
|
||||||
use derive_new::new;
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
use futures_codec::{Decoder, Encoder, Framed};
|
|
||||||
use itertools::Itertools;
|
|
||||||
use log::{log_enabled, trace};
|
|
||||||
use std::fmt;
|
|
||||||
use std::io::{Error, ErrorKind};
|
|
||||||
use subprocess::Exec;
|
|
||||||
|
|
||||||
/// A simple `Codec` implementation that splits up data into lines.
|
|
||||||
pub struct LinesCodec {}
|
|
||||||
|
|
||||||
impl Encoder for LinesCodec {
|
|
||||||
type Item = String;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
||||||
dst.put(item);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decoder for LinesCodec {
|
|
||||||
type Item = String;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
||||||
match src.iter().position(|b| b == &b'\n') {
|
|
||||||
Some(pos) if !src.is_empty() => {
|
|
||||||
let buf = src.split_to(pos + 1);
|
|
||||||
String::from_utf8(buf.to_vec())
|
|
||||||
.map(Some)
|
|
||||||
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
|
|
||||||
}
|
|
||||||
_ if !src.is_empty() => {
|
|
||||||
let drained = src.take();
|
|
||||||
String::from_utf8(drained.to_vec())
|
|
||||||
.map(Some)
|
|
||||||
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
|
|
||||||
}
|
|
||||||
_ => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct ClassifiedInputStream {
|
|
||||||
pub(crate) objects: InputStream,
|
|
||||||
pub(crate) stdin: Option<std::fs::File>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ClassifiedInputStream {
|
|
||||||
pub(crate) fn new() -> ClassifiedInputStream {
|
|
||||||
ClassifiedInputStream {
|
|
||||||
objects: vec![Value::nothing().tagged(Tag::unknown())].into(),
|
|
||||||
stdin: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn from_input_stream(stream: impl Into<InputStream>) -> ClassifiedInputStream {
|
|
||||||
ClassifiedInputStream {
|
|
||||||
objects: stream.into(),
|
|
||||||
stdin: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream {
|
|
||||||
ClassifiedInputStream {
|
|
||||||
objects: VecDeque::new().into(),
|
|
||||||
stdin: Some(stdout),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub(crate) struct ClassifiedPipeline {
|
|
||||||
pub(crate) commands: Spanned<Vec<ClassifiedCommand>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FormatDebug for ClassifiedPipeline {
|
|
||||||
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
|
||||||
f.say_str(
|
|
||||||
"classified pipeline",
|
|
||||||
self.commands.iter().map(|c| c.debug(source)).join(" | "),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HasSpan for ClassifiedPipeline {
|
|
||||||
fn span(&self) -> Span {
|
|
||||||
self.commands.span
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
|
||||||
pub(crate) enum ClassifiedCommand {
|
|
||||||
#[allow(unused)]
|
|
||||||
Expr(TokenNode),
|
|
||||||
Internal(InternalCommand),
|
|
||||||
#[allow(unused)]
|
|
||||||
Dynamic(Spanned<hir::Call>),
|
|
||||||
External(ExternalCommand),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FormatDebug for ClassifiedCommand {
|
|
||||||
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
ClassifiedCommand::Expr(expr) => expr.fmt_debug(f, source),
|
|
||||||
ClassifiedCommand::Internal(internal) => internal.fmt_debug(f, source),
|
|
||||||
ClassifiedCommand::Dynamic(dynamic) => dynamic.fmt_debug(f, source),
|
|
||||||
ClassifiedCommand::External(external) => external.fmt_debug(f, source),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HasSpan for ClassifiedCommand {
|
|
||||||
fn span(&self) -> Span {
|
|
||||||
match self {
|
|
||||||
ClassifiedCommand::Expr(node) => node.span(),
|
|
||||||
ClassifiedCommand::Internal(command) => command.span(),
|
|
||||||
ClassifiedCommand::Dynamic(call) => call.span,
|
|
||||||
ClassifiedCommand::External(command) => command.span(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(new, Debug, Clone, Eq, PartialEq)]
|
|
||||||
pub(crate) struct InternalCommand {
|
|
||||||
pub(crate) name: String,
|
|
||||||
pub(crate) name_tag: Tag,
|
|
||||||
pub(crate) args: Spanned<hir::Call>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HasSpan for InternalCommand {
|
|
||||||
fn span(&self) -> Span {
|
|
||||||
let start = self.name_tag.span;
|
|
||||||
|
|
||||||
start.until(self.args.span)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FormatDebug for InternalCommand {
|
|
||||||
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
|
||||||
f.say("internal", self.args.debug(source))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(new, Debug, Eq, PartialEq)]
|
|
||||||
pub(crate) struct DynamicCommand {
|
|
||||||
pub(crate) args: hir::Call,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalCommand {
|
|
||||||
pub(crate) fn run(
|
|
||||||
self,
|
|
||||||
context: &mut Context,
|
|
||||||
input: ClassifiedInputStream,
|
|
||||||
source: Text,
|
|
||||||
) -> Result<InputStream, ShellError> {
|
|
||||||
if log_enabled!(log::Level::Trace) {
|
|
||||||
trace!(target: "nu::run::internal", "->");
|
|
||||||
trace!(target: "nu::run::internal", "{}", self.name);
|
|
||||||
trace!(target: "nu::run::internal", "{}", self.args.debug(&source));
|
|
||||||
}
|
|
||||||
|
|
||||||
let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", source: source, "input" = input.objects);
|
|
||||||
|
|
||||||
let command = context.expect_command(&self.name);
|
|
||||||
|
|
||||||
let result = {
|
|
||||||
context.run_command(
|
|
||||||
command,
|
|
||||||
self.name_tag.clone(),
|
|
||||||
self.args.item,
|
|
||||||
&source,
|
|
||||||
objects,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = trace_out_stream!(target: "nu::trace_stream::internal", source: source, "output" = result);
|
|
||||||
let mut result = result.values;
|
|
||||||
let mut context = context.clone();
|
|
||||||
|
|
||||||
let stream = async_stream! {
|
|
||||||
let mut soft_errs: Vec<ShellError> = vec![];
|
|
||||||
let mut yielded = false;
|
|
||||||
|
|
||||||
while let Some(item) = result.next().await {
|
|
||||||
match item {
|
|
||||||
Ok(ReturnSuccess::Action(action)) => match action {
|
|
||||||
CommandAction::ChangePath(path) => {
|
|
||||||
context.shell_manager.set_path(path);
|
|
||||||
}
|
|
||||||
CommandAction::Exit => std::process::exit(0), // TODO: save history.txt
|
|
||||||
CommandAction::Error(err) => {
|
|
||||||
context.error(err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
CommandAction::EnterHelpShell(value) => {
|
|
||||||
match value {
|
|
||||||
Tagged {
|
|
||||||
item: Value::Primitive(Primitive::String(cmd)),
|
|
||||||
tag,
|
|
||||||
} => {
|
|
||||||
context.shell_manager.insert_at_current(Box::new(
|
|
||||||
HelpShell::for_command(
|
|
||||||
Value::string(cmd).tagged(tag),
|
|
||||||
&context.registry(),
|
|
||||||
).unwrap(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
context.shell_manager.insert_at_current(Box::new(
|
|
||||||
HelpShell::index(&context.registry()).unwrap(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
CommandAction::EnterValueShell(value) => {
|
|
||||||
context
|
|
||||||
.shell_manager
|
|
||||||
.insert_at_current(Box::new(ValueShell::new(value)));
|
|
||||||
}
|
|
||||||
CommandAction::EnterShell(location) => {
|
|
||||||
context.shell_manager.insert_at_current(Box::new(
|
|
||||||
FilesystemShell::with_location(location, context.registry().clone()).unwrap(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
CommandAction::PreviousShell => {
|
|
||||||
context.shell_manager.prev();
|
|
||||||
}
|
|
||||||
CommandAction::NextShell => {
|
|
||||||
context.shell_manager.next();
|
|
||||||
}
|
|
||||||
CommandAction::LeaveShell => {
|
|
||||||
context.shell_manager.remove_at_current();
|
|
||||||
if context.shell_manager.is_empty() {
|
|
||||||
std::process::exit(0); // TODO: save history.txt
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
Ok(ReturnSuccess::Value(v)) => {
|
|
||||||
yielded = true;
|
|
||||||
yield Ok(v);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(ReturnSuccess::DebugValue(v)) => {
|
|
||||||
yielded = true;
|
|
||||||
|
|
||||||
let doc = v.item.pretty_doc();
|
|
||||||
let mut buffer = termcolor::Buffer::ansi();
|
|
||||||
|
|
||||||
doc.render_raw(
|
|
||||||
context.with_host(|host| host.width() - 5),
|
|
||||||
&mut crate::parser::debug::TermColored::new(&mut buffer),
|
|
||||||
).unwrap();
|
|
||||||
|
|
||||||
let value = String::from_utf8_lossy(buffer.as_slice());
|
|
||||||
|
|
||||||
yield Ok(Value::string(value).tagged_unknown())
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(err) => {
|
|
||||||
context.error(err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(stream.to_input_stream())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
|
||||||
pub(crate) struct ExternalCommand {
|
|
||||||
pub(crate) name: String,
|
|
||||||
|
|
||||||
pub(crate) name_tag: Tag,
|
|
||||||
pub(crate) args: Spanned<Vec<Tagged<String>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FormatDebug for ExternalCommand {
|
|
||||||
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
|
||||||
write!(f, "{}", self.name)?;
|
|
||||||
|
|
||||||
if self.args.item.len() > 0 {
|
|
||||||
write!(f, " ")?;
|
|
||||||
write!(f, "{}", self.args.iter().map(|i| i.debug(source)).join(" "))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HasSpan for ExternalCommand {
|
|
||||||
fn span(&self) -> Span {
|
|
||||||
self.name_tag.span.until(self.args.span)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(crate) enum StreamNext {
|
|
||||||
Last,
|
|
||||||
External,
|
|
||||||
Internal,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ExternalCommand {
|
|
||||||
pub(crate) async fn run(
|
|
||||||
self,
|
|
||||||
context: &mut Context,
|
|
||||||
input: ClassifiedInputStream,
|
|
||||||
stream_next: StreamNext,
|
|
||||||
) -> Result<ClassifiedInputStream, ShellError> {
|
|
||||||
let stdin = input.stdin;
|
|
||||||
let inputs: Vec<Tagged<Value>> = input.objects.into_vec().await;
|
|
||||||
|
|
||||||
trace!(target: "nu::run::external", "-> {}", self.name);
|
|
||||||
trace!(target: "nu::run::external", "inputs = {:?}", inputs);
|
|
||||||
|
|
||||||
let mut arg_string = format!("{}", self.name);
|
|
||||||
for arg in &self.args.item {
|
|
||||||
arg_string.push_str(&arg);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(target: "nu::run::external", "command = {:?}", self.name);
|
|
||||||
|
|
||||||
let mut process;
|
|
||||||
if arg_string.contains("$it") {
|
|
||||||
let input_strings = inputs
|
|
||||||
.iter()
|
|
||||||
.map(|i| {
|
|
||||||
i.as_string().map_err(|_| {
|
|
||||||
let arg = self.args.iter().find(|arg| arg.item.contains("$it"));
|
|
||||||
if let Some(arg) = arg {
|
|
||||||
ShellError::labeled_error(
|
|
||||||
"External $it needs string data",
|
|
||||||
"given row instead of string data",
|
|
||||||
arg.tag(),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
ShellError::labeled_error(
|
|
||||||
"$it needs string data",
|
|
||||||
"given something else",
|
|
||||||
self.name_tag.clone(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.collect::<Result<Vec<String>, ShellError>>()?;
|
|
||||||
|
|
||||||
let commands = input_strings.iter().map(|i| {
|
|
||||||
let args = self.args.iter().filter_map(|arg| {
|
|
||||||
if arg.chars().all(|c| c.is_whitespace()) {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(arg.replace("$it", &i))
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
format!("{} {}", self.name, itertools::join(args, " "))
|
|
||||||
});
|
|
||||||
|
|
||||||
process = Exec::shell(itertools::join(commands, " && "))
|
|
||||||
} else {
|
|
||||||
process = Exec::cmd(&self.name);
|
|
||||||
for arg in &self.args.item {
|
|
||||||
let arg_chars: Vec<_> = arg.chars().collect();
|
|
||||||
if arg_chars.len() > 1
|
|
||||||
&& arg_chars[0] == '"'
|
|
||||||
&& arg_chars[arg_chars.len() - 1] == '"'
|
|
||||||
{
|
|
||||||
// quoted string
|
|
||||||
let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect();
|
|
||||||
process = process.arg(new_arg);
|
|
||||||
} else {
|
|
||||||
process = process.arg(arg.item.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
process = process.cwd(context.shell_manager.path());
|
|
||||||
|
|
||||||
trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path());
|
|
||||||
|
|
||||||
let mut process = match stream_next {
|
|
||||||
StreamNext::Last => process,
|
|
||||||
StreamNext::External | StreamNext::Internal => {
|
|
||||||
process.stdout(subprocess::Redirection::Pipe)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
trace!(target: "nu::run::external", "set up stdout pipe");
|
|
||||||
|
|
||||||
if let Some(stdin) = stdin {
|
|
||||||
process = process.stdin(stdin);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(target: "nu::run::external", "set up stdin pipe");
|
|
||||||
trace!(target: "nu::run::external", "built process {:?}", process);
|
|
||||||
|
|
||||||
let popen = process.popen();
|
|
||||||
|
|
||||||
trace!(target: "nu::run::external", "next = {:?}", stream_next);
|
|
||||||
|
|
||||||
let name_tag = self.name_tag.clone();
|
|
||||||
if let Ok(mut popen) = popen {
|
|
||||||
match stream_next {
|
|
||||||
StreamNext::Last => {
|
|
||||||
let _ = popen.detach();
|
|
||||||
loop {
|
|
||||||
match popen.poll() {
|
|
||||||
None => {
|
|
||||||
let _ = std::thread::sleep(std::time::Duration::new(0, 100000000));
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let _ = popen.terminate();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(ClassifiedInputStream::new())
|
|
||||||
}
|
|
||||||
StreamNext::External => {
|
|
||||||
let _ = popen.detach();
|
|
||||||
let stdout = popen.stdout.take().unwrap();
|
|
||||||
Ok(ClassifiedInputStream::from_stdout(stdout))
|
|
||||||
}
|
|
||||||
StreamNext::Internal => {
|
|
||||||
let _ = popen.detach();
|
|
||||||
let stdout = popen.stdout.take().unwrap();
|
|
||||||
let file = futures::io::AllowStdIo::new(stdout);
|
|
||||||
let stream = Framed::new(file, LinesCodec {});
|
|
||||||
let stream =
|
|
||||||
stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag));
|
|
||||||
Ok(ClassifiedInputStream::from_input_stream(
|
|
||||||
stream.boxed() as BoxStream<'static, Tagged<Value>>
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(ShellError::labeled_error(
|
|
||||||
"Command not found",
|
|
||||||
"command not found",
|
|
||||||
name_tag,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
7
src/commands/classified/dynamic.rs
Normal file
7
src/commands/classified/dynamic.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
use crate::parser::hir;
|
||||||
|
use derive_new::new;
|
||||||
|
|
||||||
|
#[derive(new, Debug, Eq, PartialEq)]
|
||||||
|
pub(crate) struct Command {
|
||||||
|
pub(crate) args: hir::Call,
|
||||||
|
}
|
223
src/commands/classified/external.rs
Normal file
223
src/commands/classified/external.rs
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
use super::ClassifiedInputStream;
|
||||||
|
use crate::prelude::*;
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
use futures_codec::{Decoder, Encoder, Framed};
|
||||||
|
use itertools::Itertools;
|
||||||
|
use log::trace;
|
||||||
|
use std::fmt;
|
||||||
|
use std::io::{Error, ErrorKind};
|
||||||
|
use subprocess::Exec;
|
||||||
|
|
||||||
|
/// A simple `Codec` implementation that splits up data into lines.
|
||||||
|
pub struct LinesCodec {}
|
||||||
|
|
||||||
|
impl Encoder for LinesCodec {
|
||||||
|
type Item = String;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
dst.put(item);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Decoder for LinesCodec {
|
||||||
|
type Item = String;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
match src.iter().position(|b| b == &b'\n') {
|
||||||
|
Some(pos) if !src.is_empty() => {
|
||||||
|
let buf = src.split_to(pos + 1);
|
||||||
|
String::from_utf8(buf.to_vec())
|
||||||
|
.map(Some)
|
||||||
|
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
|
||||||
|
}
|
||||||
|
_ if !src.is_empty() => {
|
||||||
|
let drained = src.take();
|
||||||
|
String::from_utf8(drained.to_vec())
|
||||||
|
.map(Some)
|
||||||
|
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
|
||||||
|
}
|
||||||
|
_ => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||||
|
pub(crate) struct Command {
|
||||||
|
pub(crate) name: String,
|
||||||
|
|
||||||
|
pub(crate) name_tag: Tag,
|
||||||
|
pub(crate) args: Spanned<Vec<Tagged<String>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FormatDebug for Command {
|
||||||
|
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
||||||
|
write!(f, "{}", self.name)?;
|
||||||
|
|
||||||
|
if self.args.item.len() > 0 {
|
||||||
|
write!(f, " ")?;
|
||||||
|
write!(f, "{}", self.args.iter().map(|i| i.debug(source)).join(" "))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HasSpan for Command {
|
||||||
|
fn span(&self) -> Span {
|
||||||
|
self.name_tag.span.until(self.args.span)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) enum StreamNext {
|
||||||
|
Last,
|
||||||
|
External,
|
||||||
|
Internal,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Command {
|
||||||
|
pub(crate) async fn run(
|
||||||
|
self,
|
||||||
|
context: &mut Context,
|
||||||
|
input: ClassifiedInputStream,
|
||||||
|
stream_next: StreamNext,
|
||||||
|
) -> Result<ClassifiedInputStream, ShellError> {
|
||||||
|
let stdin = input.stdin;
|
||||||
|
let inputs: Vec<Tagged<Value>> = input.objects.into_vec().await;
|
||||||
|
|
||||||
|
trace!(target: "nu::run::external", "-> {}", self.name);
|
||||||
|
trace!(target: "nu::run::external", "inputs = {:?}", inputs);
|
||||||
|
|
||||||
|
let mut arg_string = format!("{}", self.name);
|
||||||
|
for arg in &self.args.item {
|
||||||
|
arg_string.push_str(&arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!(target: "nu::run::external", "command = {:?}", self.name);
|
||||||
|
|
||||||
|
let mut process;
|
||||||
|
if arg_string.contains("$it") {
|
||||||
|
let input_strings = inputs
|
||||||
|
.iter()
|
||||||
|
.map(|i| {
|
||||||
|
i.as_string().map_err(|_| {
|
||||||
|
let arg = self.args.iter().find(|arg| arg.item.contains("$it"));
|
||||||
|
if let Some(arg) = arg {
|
||||||
|
ShellError::labeled_error(
|
||||||
|
"External $it needs string data",
|
||||||
|
"given row instead of string data",
|
||||||
|
arg.tag(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
ShellError::labeled_error(
|
||||||
|
"$it needs string data",
|
||||||
|
"given something else",
|
||||||
|
self.name_tag.clone(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<String>, ShellError>>()?;
|
||||||
|
|
||||||
|
let commands = input_strings.iter().map(|i| {
|
||||||
|
let args = self.args.iter().filter_map(|arg| {
|
||||||
|
if arg.chars().all(|c| c.is_whitespace()) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(arg.replace("$it", &i))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
format!("{} {}", self.name, itertools::join(args, " "))
|
||||||
|
});
|
||||||
|
|
||||||
|
process = Exec::shell(itertools::join(commands, " && "))
|
||||||
|
} else {
|
||||||
|
process = Exec::cmd(&self.name);
|
||||||
|
for arg in &self.args.item {
|
||||||
|
let arg_chars: Vec<_> = arg.chars().collect();
|
||||||
|
if arg_chars.len() > 1
|
||||||
|
&& arg_chars[0] == '"'
|
||||||
|
&& arg_chars[arg_chars.len() - 1] == '"'
|
||||||
|
{
|
||||||
|
// quoted string
|
||||||
|
let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect();
|
||||||
|
process = process.arg(new_arg);
|
||||||
|
} else {
|
||||||
|
process = process.arg(arg.item.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
process = process.cwd(context.shell_manager.path());
|
||||||
|
|
||||||
|
trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path());
|
||||||
|
|
||||||
|
let mut process = match stream_next {
|
||||||
|
StreamNext::Last => process,
|
||||||
|
StreamNext::External | StreamNext::Internal => {
|
||||||
|
process.stdout(subprocess::Redirection::Pipe)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!(target: "nu::run::external", "set up stdout pipe");
|
||||||
|
|
||||||
|
if let Some(stdin) = stdin {
|
||||||
|
process = process.stdin(stdin);
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!(target: "nu::run::external", "set up stdin pipe");
|
||||||
|
trace!(target: "nu::run::external", "built process {:?}", process);
|
||||||
|
|
||||||
|
let popen = process.popen();
|
||||||
|
|
||||||
|
trace!(target: "nu::run::external", "next = {:?}", stream_next);
|
||||||
|
|
||||||
|
let name_tag = self.name_tag.clone();
|
||||||
|
if let Ok(mut popen) = popen {
|
||||||
|
match stream_next {
|
||||||
|
StreamNext::Last => {
|
||||||
|
let _ = popen.detach();
|
||||||
|
loop {
|
||||||
|
match popen.poll() {
|
||||||
|
None => {
|
||||||
|
let _ = std::thread::sleep(std::time::Duration::new(0, 100000000));
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let _ = popen.terminate();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(ClassifiedInputStream::new())
|
||||||
|
}
|
||||||
|
StreamNext::External => {
|
||||||
|
let _ = popen.detach();
|
||||||
|
let stdout = popen.stdout.take().unwrap();
|
||||||
|
Ok(ClassifiedInputStream::from_stdout(stdout))
|
||||||
|
}
|
||||||
|
StreamNext::Internal => {
|
||||||
|
let _ = popen.detach();
|
||||||
|
let stdout = popen.stdout.take().unwrap();
|
||||||
|
let file = futures::io::AllowStdIo::new(stdout);
|
||||||
|
let stream = Framed::new(file, LinesCodec {});
|
||||||
|
let stream =
|
||||||
|
stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag));
|
||||||
|
Ok(ClassifiedInputStream::from_input_stream(
|
||||||
|
stream.boxed() as BoxStream<'static, Tagged<Value>>
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(ShellError::labeled_error(
|
||||||
|
"Command not found",
|
||||||
|
"command not found",
|
||||||
|
name_tag,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
151
src/commands/classified/internal.rs
Normal file
151
src/commands/classified/internal.rs
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
use crate::parser::hir;
|
||||||
|
use crate::prelude::*;
|
||||||
|
use derive_new::new;
|
||||||
|
use log::{log_enabled, trace};
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
use super::ClassifiedInputStream;
|
||||||
|
|
||||||
|
#[derive(new, Debug, Clone, Eq, PartialEq)]
|
||||||
|
pub(crate) struct Command {
|
||||||
|
pub(crate) name: String,
|
||||||
|
pub(crate) name_tag: Tag,
|
||||||
|
pub(crate) args: Spanned<hir::Call>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HasSpan for Command {
|
||||||
|
fn span(&self) -> Span {
|
||||||
|
let start = self.name_tag.span;
|
||||||
|
|
||||||
|
start.until(self.args.span)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FormatDebug for Command {
|
||||||
|
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
||||||
|
f.say("internal", self.args.debug(source))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Command {
|
||||||
|
pub(crate) fn run(
|
||||||
|
self,
|
||||||
|
context: &mut Context,
|
||||||
|
input: ClassifiedInputStream,
|
||||||
|
source: Text,
|
||||||
|
) -> Result<InputStream, ShellError> {
|
||||||
|
if log_enabled!(log::Level::Trace) {
|
||||||
|
trace!(target: "nu::run::internal", "->");
|
||||||
|
trace!(target: "nu::run::internal", "{}", self.name);
|
||||||
|
trace!(target: "nu::run::internal", "{}", self.args.debug(&source));
|
||||||
|
}
|
||||||
|
|
||||||
|
let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", source: source, "input" = input.objects);
|
||||||
|
|
||||||
|
let command = context.expect_command(&self.name);
|
||||||
|
|
||||||
|
let result = {
|
||||||
|
context.run_command(
|
||||||
|
command,
|
||||||
|
self.name_tag.clone(),
|
||||||
|
self.args.item,
|
||||||
|
&source,
|
||||||
|
objects,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = trace_out_stream!(target: "nu::trace_stream::internal", source: source, "output" = result);
|
||||||
|
let mut result = result.values;
|
||||||
|
let mut context = context.clone();
|
||||||
|
|
||||||
|
let stream = async_stream! {
|
||||||
|
let mut soft_errs: Vec<ShellError> = vec![];
|
||||||
|
let mut yielded = false;
|
||||||
|
|
||||||
|
while let Some(item) = result.next().await {
|
||||||
|
match item {
|
||||||
|
Ok(ReturnSuccess::Action(action)) => match action {
|
||||||
|
CommandAction::ChangePath(path) => {
|
||||||
|
context.shell_manager.set_path(path);
|
||||||
|
}
|
||||||
|
CommandAction::Exit => std::process::exit(0), // TODO: save history.txt
|
||||||
|
CommandAction::Error(err) => {
|
||||||
|
context.error(err);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
CommandAction::EnterHelpShell(value) => {
|
||||||
|
match value {
|
||||||
|
Tagged {
|
||||||
|
item: Value::Primitive(Primitive::String(cmd)),
|
||||||
|
tag,
|
||||||
|
} => {
|
||||||
|
context.shell_manager.insert_at_current(Box::new(
|
||||||
|
HelpShell::for_command(
|
||||||
|
Value::string(cmd).tagged(tag),
|
||||||
|
&context.registry(),
|
||||||
|
).unwrap(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
context.shell_manager.insert_at_current(Box::new(
|
||||||
|
HelpShell::index(&context.registry()).unwrap(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandAction::EnterValueShell(value) => {
|
||||||
|
context
|
||||||
|
.shell_manager
|
||||||
|
.insert_at_current(Box::new(ValueShell::new(value)));
|
||||||
|
}
|
||||||
|
CommandAction::EnterShell(location) => {
|
||||||
|
context.shell_manager.insert_at_current(Box::new(
|
||||||
|
FilesystemShell::with_location(location, context.registry().clone()).unwrap(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
CommandAction::PreviousShell => {
|
||||||
|
context.shell_manager.prev();
|
||||||
|
}
|
||||||
|
CommandAction::NextShell => {
|
||||||
|
context.shell_manager.next();
|
||||||
|
}
|
||||||
|
CommandAction::LeaveShell => {
|
||||||
|
context.shell_manager.remove_at_current();
|
||||||
|
if context.shell_manager.is_empty() {
|
||||||
|
std::process::exit(0); // TODO: save history.txt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
Ok(ReturnSuccess::Value(v)) => {
|
||||||
|
yielded = true;
|
||||||
|
yield Ok(v);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ReturnSuccess::DebugValue(v)) => {
|
||||||
|
yielded = true;
|
||||||
|
|
||||||
|
let doc = v.item.pretty_doc();
|
||||||
|
let mut buffer = termcolor::Buffer::ansi();
|
||||||
|
|
||||||
|
doc.render_raw(
|
||||||
|
context.with_host(|host| host.width() - 5),
|
||||||
|
&mut crate::parser::debug::TermColored::new(&mut buffer),
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let value = String::from_utf8_lossy(buffer.as_slice());
|
||||||
|
|
||||||
|
yield Ok(Value::string(value).tagged_unknown())
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
context.error(err);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(stream.to_input_stream())
|
||||||
|
}
|
||||||
|
}
|
75
src/commands/classified/mod.rs
Normal file
75
src/commands/classified/mod.rs
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
use crate::parser::{hir, TokenNode};
|
||||||
|
use crate::prelude::*;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
mod dynamic;
|
||||||
|
mod external;
|
||||||
|
mod internal;
|
||||||
|
mod pipeline;
|
||||||
|
|
||||||
|
#[allow(unused_imports)]
|
||||||
|
pub(crate) use dynamic::Command as DynamicCommand;
|
||||||
|
#[allow(unused_imports)]
|
||||||
|
pub(crate) use external::{Command as ExternalCommand, StreamNext};
|
||||||
|
pub(crate) use internal::Command as InternalCommand;
|
||||||
|
pub(crate) use pipeline::Pipeline as ClassifiedPipeline;
|
||||||
|
|
||||||
|
pub(crate) struct ClassifiedInputStream {
|
||||||
|
pub(crate) objects: InputStream,
|
||||||
|
pub(crate) stdin: Option<std::fs::File>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClassifiedInputStream {
|
||||||
|
pub(crate) fn new() -> ClassifiedInputStream {
|
||||||
|
ClassifiedInputStream {
|
||||||
|
objects: vec![Value::nothing().tagged(Tag::unknown())].into(),
|
||||||
|
stdin: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn from_input_stream(stream: impl Into<InputStream>) -> ClassifiedInputStream {
|
||||||
|
ClassifiedInputStream {
|
||||||
|
objects: stream.into(),
|
||||||
|
stdin: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream {
|
||||||
|
ClassifiedInputStream {
|
||||||
|
objects: VecDeque::new().into(),
|
||||||
|
stdin: Some(stdout),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||||
|
pub(crate) enum ClassifiedCommand {
|
||||||
|
#[allow(unused)]
|
||||||
|
Expr(TokenNode),
|
||||||
|
Internal(InternalCommand),
|
||||||
|
#[allow(unused)]
|
||||||
|
Dynamic(Spanned<hir::Call>),
|
||||||
|
External(ExternalCommand),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FormatDebug for ClassifiedCommand {
|
||||||
|
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
ClassifiedCommand::Expr(expr) => expr.fmt_debug(f, source),
|
||||||
|
ClassifiedCommand::Internal(internal) => internal.fmt_debug(f, source),
|
||||||
|
ClassifiedCommand::Dynamic(dynamic) => dynamic.fmt_debug(f, source),
|
||||||
|
ClassifiedCommand::External(external) => external.fmt_debug(f, source),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HasSpan for ClassifiedCommand {
|
||||||
|
fn span(&self) -> Span {
|
||||||
|
match self {
|
||||||
|
ClassifiedCommand::Expr(node) => node.span(),
|
||||||
|
ClassifiedCommand::Internal(command) => command.span(),
|
||||||
|
ClassifiedCommand::Dynamic(call) => call.span,
|
||||||
|
ClassifiedCommand::External(command) => command.span(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
91
src/commands/classified/pipeline.rs
Normal file
91
src/commands/classified/pipeline.rs
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
use super::{ClassifiedCommand, ClassifiedInputStream, StreamNext};
|
||||||
|
use crate::prelude::*;
|
||||||
|
use std::fmt;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) struct Pipeline {
|
||||||
|
pub(crate) commands: Spanned<Vec<ClassifiedCommand>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FormatDebug for Pipeline {
|
||||||
|
fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result {
|
||||||
|
f.say_str(
|
||||||
|
"classified pipeline",
|
||||||
|
self.commands.iter().map(|c| c.debug(source)).join(" | "),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HasSpan for Pipeline {
|
||||||
|
fn span(&self) -> Span {
|
||||||
|
self.commands.span
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Pipeline {
|
||||||
|
pub(crate) async fn run(
|
||||||
|
self,
|
||||||
|
ctx: &mut Context,
|
||||||
|
mut input: ClassifiedInputStream,
|
||||||
|
line: &str,
|
||||||
|
) -> Result<(), ShellError> {
|
||||||
|
let mut iter = self.commands.item.into_iter().peekable();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let item: Option<ClassifiedCommand> = iter.next();
|
||||||
|
let next: Option<&ClassifiedCommand> = iter.peek();
|
||||||
|
|
||||||
|
input = match (item, next) {
|
||||||
|
(Some(ClassifiedCommand::Dynamic(_)), _)
|
||||||
|
| (_, Some(ClassifiedCommand::Dynamic(_))) => {
|
||||||
|
return Err(ShellError::unimplemented("Dynamic commands"))
|
||||||
|
}
|
||||||
|
|
||||||
|
(Some(ClassifiedCommand::Expr(_)), _) | (_, Some(ClassifiedCommand::Expr(_))) => {
|
||||||
|
return Err(ShellError::unimplemented("Expression-only commands"))
|
||||||
|
}
|
||||||
|
|
||||||
|
(Some(ClassifiedCommand::Internal(left)), _) => {
|
||||||
|
let stream = left.run(ctx, input, Text::from(line))?;
|
||||||
|
ClassifiedInputStream::from_input_stream(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
(Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => {
|
||||||
|
left.run(ctx, input, StreamNext::External).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
(Some(ClassifiedCommand::External(left)), Some(_)) => {
|
||||||
|
left.run(ctx, input, StreamNext::Internal).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
(Some(ClassifiedCommand::External(left)), None) => {
|
||||||
|
left.run(ctx, input, StreamNext::Last).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
(None, _) => break,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
use futures::stream::TryStreamExt;
|
||||||
|
let mut output_stream: OutputStream = input.objects.into();
|
||||||
|
loop {
|
||||||
|
match output_stream.try_next().await {
|
||||||
|
Ok(Some(ReturnSuccess::Value(Tagged {
|
||||||
|
item: Value::Error(e),
|
||||||
|
..
|
||||||
|
}))) => return Err(e),
|
||||||
|
Ok(Some(_item)) => {
|
||||||
|
if ctx.ctrl_c.load(Ordering::SeqCst) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user