From dfffd45bcd4d0a2cfb71a757f72f1b71d4eeb00d Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Sun, 13 Mar 2022 22:52:55 +1100 Subject: [PATCH] Streaming support for lines with raw streams (#4832) --- crates/nu-command/src/filters/lines.rs | 169 +++++++++++++++++----- tests/shell/pipeline/commands/external.rs | 2 - 2 files changed, 132 insertions(+), 39 deletions(-) diff --git a/crates/nu-command/src/filters/lines.rs b/crates/nu-command/src/filters/lines.rs index 90c32c718..1ec062d18 100644 --- a/crates/nu-command/src/filters/lines.rs +++ b/crates/nu-command/src/filters/lines.rs @@ -1,8 +1,8 @@ use nu_protocol::ast::Call; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Example, IntoInterruptiblePipelineData, PipelineData, ShellError, Signature, Span, - Value, + Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError, + Signature, Span, Value, }; #[derive(Clone)] @@ -26,11 +26,12 @@ impl Command for Lines { fn run( &self, engine_state: &EngineState, - stack: &mut Stack, + _stack: &mut Stack, call: &Call, input: PipelineData, ) -> Result { let head = call.head; + let ctrlc = engine_state.ctrlc.clone(); let skip_empty = call.has_flag("skip-empty"); match input { #[allow(clippy::needless_collect)] @@ -108,41 +109,20 @@ impl Command for Lines { } PipelineData::Value(val, ..) => Err(ShellError::UnsupportedInput( format!("Not supported input: {}", val.as_string()?), - call.head, + head, )), - PipelineData::ExternalStream { .. } => { - let config = stack.get_config()?; - - //FIXME: Make sure this can fail in the future to let the user - //know to use a different encoding - let s = input.collect_string("", &config)?; - - let split_char = if s.contains("\r\n") { "\r\n" } else { "\n" }; - - #[allow(clippy::needless_collect)] - let mut lines = s - .split(split_char) - .map(|s| s.to_string()) - .collect::>(); - - // if the last one is empty, remove it, as it was just - // a newline at the end of the input we got - if let Some(last) = lines.last() { - if last.is_empty() { - lines.pop(); - } - } - - let iter = lines.into_iter().filter_map(move |s| { - if skip_empty && s.trim().is_empty() { - None - } else { - Some(Value::string(s, head)) - } - }); - - Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())) - } + PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::new(head)), + PipelineData::ExternalStream { + stdout: Some(stream), + .. + } => Ok(RawStreamLinesAdapter::new(stream, head, skip_empty) + .into_iter() + .enumerate() + .map(move |(_idx, x)| match x { + Ok(x) => x, + Err(err) => Value::Error { error: err }, + }) + .into_pipeline_data(ctrlc)), } } @@ -157,3 +137,118 @@ impl Command for Lines { }] } } + +#[derive(Debug)] +struct RawStreamLinesAdapter { + inner: RawStream, + inner_complete: bool, + skip_empty: bool, + span: Span, + incomplete_line: String, + queue: Vec, +} + +impl Iterator for RawStreamLinesAdapter { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + if !self.queue.is_empty() { + let s = self.queue.remove(0usize); + + if self.skip_empty && s.trim().is_empty() { + continue; + } + + return Some(Ok(Value::String { + val: s, + span: self.span, + })); + } else { + // inner is complete, feed out remaining state + if self.inner_complete { + if !self.incomplete_line.is_empty() { + let r = Some(Ok(Value::String { + val: self.incomplete_line.to_string(), + span: self.span, + })); + self.incomplete_line = String::new(); + return r; + } + + return None; + } + + // pull more data from inner + if let Some(result) = self.inner.next() { + match result { + Ok(v) => { + match v { + Value::String { val, span } => { + self.span = span; + + let split_char = + if val.contains("\r\n") { "\r\n" } else { "\n" }; + + let mut lines = val + .split(split_char) + .map(|s| s.to_string()) + .collect::>(); + + // handle incomplete line from previous + if !self.incomplete_line.is_empty() { + if let Some(first) = lines.first() { + let new_incomplete_line = + self.incomplete_line.to_string() + first; + lines.splice(0..1, vec![new_incomplete_line]); + self.incomplete_line = String::new(); + } + } + + // store incomplete line from current + if let Some(last) = lines.last() { + if last.is_empty() { + // we ended on a line ending + lines.pop(); + } else { + // incomplete line, save for next time + if let Some(s) = lines.pop() { + self.incomplete_line = s; + } + } + } + + // save completed lines + self.queue.append(&mut lines); + } + // TODO: Value::Binary support required? + _ => { + return Some(Err(ShellError::UnsupportedInput( + "Unsupport type from raw stream".to_string(), + self.span, + ))) + } + } + } + Err(_) => todo!(), + } + } else { + self.inner_complete = true; + } + } + } + } +} + +impl RawStreamLinesAdapter { + pub fn new(inner: RawStream, span: Span, skip_empty: bool) -> Self { + Self { + inner, + span, + skip_empty, + incomplete_line: String::new(), + queue: Vec::::new(), + inner_complete: false, + } + } +} diff --git a/tests/shell/pipeline/commands/external.rs b/tests/shell/pipeline/commands/external.rs index 489e810b0..b8791cc96 100644 --- a/tests/shell/pipeline/commands/external.rs +++ b/tests/shell/pipeline/commands/external.rs @@ -195,8 +195,6 @@ mod stdin_evaluation { assert_eq!(actual.err, ""); } - // FIXME: JT: `lines` doesn't currently support this kind of streaming - #[ignore] #[test] fn does_not_block_indefinitely() { let stdout = nu!(