Pipeline sink refactor (#1359)

* Refactor pipeline ahead of block changes. Add '-c' commandline option

* Update pipelining an error value

* Fmt

* Clippy

* Add stdin redirect for -c flag

* Add stdin redirect for -c flag
This commit is contained in:
Jonathan Turner 2020-02-08 18:24:33 -08:00 committed by GitHub
parent 3687603799
commit 5f4fae5b06
11 changed files with 221 additions and 81 deletions

1
Cargo.lock generated
View File

@ -2147,6 +2147,7 @@ dependencies = [
"ansi_term 0.12.1",
"app_dirs",
"async-stream",
"atty",
"base64 0.11.0",
"bigdecimal",
"bson",

View File

@ -124,6 +124,7 @@ termcolor = "1.1.0"
natural = "0.3.0"
parking_lot = "0.10.0"
meval = "0.2"
atty = "0.2"
clipboard = {version = "0.5", optional = true }
ptree = {version = "0.2" }

View File

@ -204,6 +204,12 @@ pub struct SpannedExpression {
pub span: Span,
}
impl SpannedExpression {
pub fn new(expr: Expression, span: Span) -> SpannedExpression {
SpannedExpression { expr, span }
}
}
impl std::ops::Deref for SpannedExpression {
type Target = Expression;

View File

@ -102,7 +102,7 @@ impl Plugin for Average {
}
UntaggedValue::Primitive(Primitive::Bytes(bytes)) => {
let avg = *bytes as f64 / self.count as f64;
let primitive_value: UntaggedValue = Primitive::from(avg).into();
let primitive_value: UntaggedValue = UntaggedValue::bytes(avg as u64);
let tagged_value = primitive_value.into_value(inner.tag.clone());
Ok(vec![ReturnSuccess::value(tagged_value)])
}

View File

@ -6,13 +6,10 @@ use crate::context::Context;
#[cfg(not(feature = "starship-prompt"))]
use crate::git::current_branch;
use crate::prelude::*;
use futures_codec::{FramedRead, LinesCodec};
use nu_errors::ShellError;
use nu_parser::hir::Expression;
use nu_parser::{
hir, ClassifiedCommand, ClassifiedPipeline, InternalCommand, PipelineShape, SpannedToken,
TokensIterator,
};
use nu_protocol::{Signature, Value};
use nu_parser::{ClassifiedPipeline, PipelineShape, SpannedToken, TokensIterator};
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
use log::{debug, log_enabled, trace};
use rustyline::error::ReadlineError;
@ -240,10 +237,9 @@ fn create_default_starship_config() -> Option<toml::Value> {
Some(toml::Value::Table(map))
}
/// The entry point for the CLI. Will register all known internal commands, load experimental commands, load plugins, then prepare the prompt and line reader for input.
pub async fn cli() -> Result<(), Box<dyn Error>> {
let mut syncer = crate::env::environment_syncer::EnvironmentSyncer::new();
pub fn create_default_context(
syncer: &mut crate::env::environment_syncer::EnvironmentSyncer,
) -> Result<Context, Box<dyn Error>> {
syncer.load_environment();
let mut context = Context::basic()?;
@ -374,6 +370,52 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
}
}
Ok(context)
}
pub async fn run_pipeline_standalone(pipeline: String) -> Result<(), Box<dyn Error>> {
let mut syncer = crate::env::environment_syncer::EnvironmentSyncer::new();
let mut context = create_default_context(&mut syncer)?;
let _ = load_plugins(&mut context);
let cc = context.ctrl_c.clone();
ctrlc::set_handler(move || {
cc.store(true, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
if context.ctrl_c.load(Ordering::SeqCst) {
context.ctrl_c.store(false, Ordering::SeqCst);
}
let line = process_line(Ok(pipeline), &mut context, true).await;
match line {
LineResult::Success(line) => {
context.maybe_print_errors(Text::from(line));
}
LineResult::Error(line, err) => {
context.with_host(|host| {
print_err(err, host, &Text::from(line.clone()));
});
context.maybe_print_errors(Text::from(line));
}
_ => {}
}
Ok(())
}
/// The entry point for the CLI. Will register all known internal commands, load experimental commands, load plugins, then prepare the prompt and line reader for input.
pub async fn cli() -> Result<(), Box<dyn Error>> {
let mut syncer = crate::env::environment_syncer::EnvironmentSyncer::new();
let mut context = create_default_context(&mut syncer)?;
let _ = load_plugins(&mut context);
let config = Config::builder().color_mode(ColorMode::Forced).build();
@ -482,7 +524,7 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
initial_command = None;
}
let line = process_line(readline, &mut context).await;
let line = process_line(readline, &mut context, false).await;
// Check the config to see if we need to update the path
// TODO: make sure config is cached so we don't path this load every call
@ -561,7 +603,11 @@ enum LineResult {
}
/// Process the line by parsing the text to turn it into commands, classify those commands so that we understand what is being called in the pipeline, and then run this pipeline
async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult {
async fn process_line(
readline: Result<String, ReadlineError>,
ctx: &mut Context,
redirect_stdin: bool,
) -> LineResult {
match &readline {
Ok(line) if line.trim() == "" => LineResult::Success(line.clone()),
@ -579,37 +625,68 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
debug!("=== Parsed ===");
debug!("{:#?}", result);
let mut pipeline = classify_pipeline(&result, ctx, &Text::from(line));
let pipeline = classify_pipeline(&result, ctx, &Text::from(line));
if let Some(failure) = pipeline.failed {
return LineResult::Error(line.to_string(), failure.into());
}
let should_push = match pipeline.commands.list.last() {
Some(ClassifiedCommand::External(_)) => false,
_ => true,
let input_stream = if redirect_stdin && !atty::is(atty::Stream::Stdin) {
let file = futures::io::AllowStdIo::new(std::io::stdin());
let stream = FramedRead::new(file, LinesCodec).map(|line| {
if let Ok(line) = line {
Ok(Value {
value: UntaggedValue::Primitive(Primitive::String(line)),
tag: Tag::unknown(),
})
} else {
panic!("Internal error: could not read lines of text from stdin")
}
});
Some(stream.to_input_stream())
} else {
None
};
if should_push {
pipeline
.commands
.list
.push(ClassifiedCommand::Internal(InternalCommand {
name: "autoview".to_string(),
name_tag: Tag::unknown(),
args: hir::Call::new(
Box::new(
Expression::synthetic_string("autoview").into_expr(Span::unknown()),
),
None,
None,
Span::unknown(),
),
}));
}
match run_pipeline(pipeline, ctx, input_stream, line).await {
Ok(Some(input)) => {
// Running a pipeline gives us back a stream that we can then
// work through. At the top level, we just want to pull on the
// values to compute them.
use futures::stream::TryStreamExt;
match run_pipeline(pipeline, ctx, None, line).await {
Ok(_) => LineResult::Success(line.to_string()),
let context = RunnableContext {
input,
shell_manager: ctx.shell_manager.clone(),
host: ctx.host.clone(),
ctrl_c: ctx.ctrl_c.clone(),
commands: ctx.registry.clone(),
name: Tag::unknown(),
source: Text::from(String::new()),
};
if let Ok(mut output_stream) = crate::commands::autoview::autoview(context) {
loop {
match output_stream.try_next().await {
Ok(Some(ReturnSuccess::Value(Value {
value: UntaggedValue::Error(e),
..
}))) => return LineResult::Error(line.to_string(), e),
Ok(Some(_item)) => {
if ctx.ctrl_c.load(Ordering::SeqCst) {
break;
}
}
_ => {
break;
}
}
}
}
LineResult::Success(line.to_string())
}
Ok(None) => LineResult::Success(line.to_string()),
Err(err) => LineResult::Error(line.to_string(), err),
}
}

View File

@ -103,8 +103,8 @@ pub(crate) mod wrap;
pub(crate) use autoview::Autoview;
pub(crate) use cd::Cd;
pub(crate) use command::{
per_item_command, whole_stream_command, Command, PerItemCommand, RawCommandArgs,
UnevaluatedCallInfo, WholeStreamCommand,
per_item_command, whole_stream_command, Command, PerItemCommand, UnevaluatedCallInfo,
WholeStreamCommand,
};
pub(crate) use append::Append;

View File

@ -1,14 +1,14 @@
use crate::commands::{RawCommandArgs, WholeStreamCommand};
use crate::commands::UnevaluatedCallInfo;
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_parser::{hir, hir::Expression, hir::Literal, hir::SpannedExpression};
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
pub struct Autoview;
#[derive(Deserialize)]
pub struct AutoviewArgs {}
impl WholeStreamCommand for Autoview {
fn name(&self) -> &str {
"autoview"
@ -27,21 +27,48 @@ impl WholeStreamCommand for Autoview {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(args.process_raw(registry, autoview)?.run())
autoview(RunnableContext {
input: args.input,
commands: registry.clone(),
shell_manager: args.shell_manager,
host: args.host,
source: args.call_info.source,
ctrl_c: args.ctrl_c,
name: args.call_info.name_tag,
})
}
}
pub fn autoview(
AutoviewArgs {}: AutoviewArgs,
context: RunnableContext,
raw: RawCommandArgs,
) -> Result<OutputStream, ShellError> {
pub struct RunnableContextWithoutInput {
pub shell_manager: ShellManager,
pub host: Arc<parking_lot::Mutex<Box<dyn Host>>>,
pub source: Text,
pub ctrl_c: Arc<AtomicBool>,
pub commands: CommandRegistry,
pub name: Tag,
}
impl RunnableContextWithoutInput {
pub fn convert(context: RunnableContext) -> (InputStream, RunnableContextWithoutInput) {
let new_context = RunnableContextWithoutInput {
shell_manager: context.shell_manager,
host: context.host,
source: context.source,
ctrl_c: context.ctrl_c,
commands: context.commands,
name: context.name,
};
(context.input, new_context)
}
}
pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
let binary = context.get_command("binaryview");
let text = context.get_command("textview");
let table = context.get_command("table");
Ok(OutputStream::new(async_stream! {
let mut input_stream = context.input;
let (mut input_stream, context) = RunnableContextWithoutInput::convert(context);
match input_stream.next().await {
Some(x) => {
@ -66,7 +93,7 @@ pub fn autoview(
};
let stream = stream.to_input_stream();
if let Some(table) = table {
let mut command_args = raw.with_input(stream);
let command_args = create_default_command_args(&context).with_input(stream);
let result = table.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
}
@ -80,7 +107,8 @@ pub fn autoview(
if let Some(text) = text {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let result = text.run(raw.with_input(stream), &context.commands);
let command_args = create_default_command_args(&context).with_input(stream);
let result = text.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{}", s);
@ -99,7 +127,8 @@ pub fn autoview(
if let Some(text) = text {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let result = text.run(raw.with_input(stream), &context.commands);
let command_args = create_default_command_args(&context).with_input(stream);
let result = text.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{}\n", s);
@ -134,7 +163,8 @@ pub fn autoview(
if let Some(binary) = binary {
let mut stream = VecDeque::new();
stream.push_back(x);
let result = binary.run(raw.with_input(stream), &context.commands);
let command_args = create_default_command_args(&context).with_input(stream);
let result = binary.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
} else {
use pretty_hex::*;
@ -149,7 +179,8 @@ pub fn autoview(
if let Some(table) = table {
let mut stream = VecDeque::new();
stream.push_back(x);
let result = table.run(raw.with_input(stream), &context.commands);
let command_args = create_default_command_args(&context).with_input(stream);
let result = table.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{:?}", item);
@ -170,3 +201,25 @@ pub fn autoview(
}
}))
}
fn create_default_command_args(context: &RunnableContextWithoutInput) -> RawCommandArgs {
let span = context.name.span;
RawCommandArgs {
host: context.host.clone(),
ctrl_c: context.ctrl_c.clone(),
shell_manager: context.shell_manager.clone(),
call_info: UnevaluatedCallInfo {
args: hir::Call {
head: Box::new(SpannedExpression::new(
Expression::Literal(Literal::String(span)),
span,
)),
positional: None,
named: None,
span,
},
source: context.source.clone(),
name_tag: context.name.clone(),
},
}
}

View File

@ -138,6 +138,14 @@ pub(crate) async fn run_internal_command(
}
},
Ok(ReturnSuccess::Value(Value {
value: UntaggedValue::Error(err),
..
})) => {
context.error(err);
break;
}
Ok(ReturnSuccess::Value(v)) => {
yielded = true;
yield Ok(v);

View File

@ -1,19 +1,17 @@
use crate::commands::classified::external::run_external_command;
use crate::commands::classified::internal::run_internal_command;
use crate::context::Context;
use crate::stream::{InputStream, OutputStream};
use crate::stream::InputStream;
use nu_errors::ShellError;
use nu_parser::{ClassifiedCommand, ClassifiedPipeline};
use nu_protocol::{ReturnSuccess, UntaggedValue, Value};
use nu_source::Text;
use std::sync::atomic::Ordering;
pub(crate) async fn run_pipeline(
pipeline: ClassifiedPipeline,
ctx: &mut Context,
mut input: Option<InputStream>,
line: &str,
) -> Result<(), ShellError> {
) -> Result<Option<InputStream>, ShellError> {
let mut iter = pipeline.commands.list.into_iter().peekable();
loop {
@ -48,26 +46,5 @@ pub(crate) async fn run_pipeline(
};
}
use futures::stream::TryStreamExt;
if let Some(input) = input {
let mut output_stream: OutputStream = input.into();
loop {
match output_stream.try_next().await {
Ok(Some(ReturnSuccess::Value(Value {
value: UntaggedValue::Error(e),
..
}))) => return Err(e),
Ok(Some(_item)) => {
if ctx.ctrl_c.load(Ordering::SeqCst) {
break;
}
}
_ => {
break;
}
}
}
}
Ok(())
Ok(input)
}

View File

@ -20,7 +20,7 @@ mod shell;
mod stream;
mod utils;
pub use crate::cli::cli;
pub use crate::cli::{cli, run_pipeline_standalone};
pub use crate::data::dict::TaggedListBuilder;
pub use crate::data::primitive;
pub use crate::data::value;

View File

@ -13,6 +13,13 @@ fn main() -> Result<(), Box<dyn Error>> {
.possible_values(&["error", "warn", "info", "debug", "trace"])
.takes_value(true),
)
.arg(
Arg::with_name("commands")
.short("c")
.long("commands")
.multiple(false)
.takes_value(true),
)
.arg(
Arg::with_name("develop")
.long("develop")
@ -63,6 +70,16 @@ fn main() -> Result<(), Box<dyn Error>> {
}
}
match matches.values_of("commands") {
None => {}
Some(values) => {
for item in values {
futures::executor::block_on(nu::run_pipeline_standalone(item.into()))?;
}
return Ok(());
}
}
builder.try_init()?;
println!(