diff --git a/crates/nu-command/src/strings/parse.rs b/crates/nu-command/src/strings/parse.rs index 07aae3efd..74eeda4ae 100644 --- a/crates/nu-command/src/strings/parse.rs +++ b/crates/nu-command/src/strings/parse.rs @@ -160,59 +160,97 @@ fn operate( })?; let columns = column_names(®ex_pattern); - let mut parsed: Vec = Vec::new(); - for v in input { - match v.as_string() { - Ok(s) => { - let results = regex_pattern.captures_iter(&s); + match input { + PipelineData::Empty => Ok(PipelineData::Empty), + PipelineData::Value(..) => { + let mut parsed: Vec = Vec::new(); - for c in results { - let mut cols = Vec::with_capacity(columns.len()); - let captures = match c { - Ok(c) => c, - Err(e) => { - return Err(ShellError::GenericError( - "Error with regular expression captures".into(), - e.to_string(), - None, - None, - Vec::new(), - )) + for v in input { + match v.as_string() { + Ok(s) => { + let results = regex_pattern.captures_iter(&s); + + for c in results { + let mut cols = Vec::with_capacity(columns.len()); + let captures = match c { + Ok(c) => c, + Err(e) => { + return Err(ShellError::GenericError( + "Error with regular expression captures".into(), + e.to_string(), + None, + None, + Vec::new(), + )) + } + }; + let mut vals = Vec::with_capacity(captures.len()); + + for (column_name, cap) in columns.iter().zip(captures.iter().skip(1)) { + let cap_string = cap.map(|v| v.as_str()).unwrap_or("").to_string(); + cols.push(column_name.clone()); + vals.push(Value::String { + val: cap_string, + span: v.span()?, + }); + } + + parsed.push(Value::Record { + cols, + vals, + span: head, + }); } - }; - let mut vals = Vec::with_capacity(captures.len()); - - for (column_name, cap) in columns.iter().zip(captures.iter().skip(1)) { - let cap_string = cap.map(|v| v.as_str()).unwrap_or("").to_string(); - cols.push(column_name.clone()); - vals.push(Value::String { - val: cap_string, - span: v.span()?, - }); } - - parsed.push(Value::Record { - cols, - vals, - span: head, - }); + Err(_) => { + return Err(ShellError::PipelineMismatch( + "string".into(), + head, + v.span()?, + )) + } } } - Err(_) => { - return Err(ShellError::PipelineMismatch( - "string".into(), - head, - v.span()?, - )) - } - } - } - Ok(PipelineData::ListStream( - ListStream::from_stream(parsed.into_iter(), ctrlc), - None, - )) + Ok(PipelineData::ListStream( + ListStream::from_stream(parsed.into_iter(), ctrlc), + None, + )) + } + PipelineData::ListStream(stream, ..) => Ok(PipelineData::ListStream( + ListStream::from_stream( + ParseStreamer { + span: head, + excess: Vec::new(), + regex: regex_pattern, + columns, + stream: stream.stream, + }, + ctrlc, + ), + None, + )), + + PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty), + + PipelineData::ExternalStream { + stdout: Some(stream), + .. + } => Ok(PipelineData::ListStream( + ListStream::from_stream( + ParseStreamerExternal { + span: head, + excess: Vec::new(), + regex: regex_pattern, + columns, + stream: stream.stream, + }, + ctrlc, + ), + None, + )), + } } fn build_regex(input: &str, span: Span) -> Result { @@ -281,6 +319,130 @@ fn column_names(regex: &Regex) -> Vec { .collect() } +pub struct ParseStreamer { + span: Span, + excess: Vec, + regex: Regex, + columns: Vec, + stream: Box + Send + 'static>, +} + +impl Iterator for ParseStreamer { + type Item = Value; + fn next(&mut self) -> Option { + if !self.excess.is_empty() { + return Some(self.excess.remove(0)); + } + + let v = self.stream.next(); + + if let Some(v) = v { + match v.as_string() { + Ok(s) => stream_helper( + self.regex.clone(), + v.span().unwrap_or(self.span), + s, + self.columns.clone(), + &mut self.excess, + ), + Err(_) => Some(Value::Error { + error: ShellError::PipelineMismatch( + "string".into(), + self.span, + v.span().unwrap_or(self.span), + ), + }), + } + } else { + None + } + } +} + +pub struct ParseStreamerExternal { + span: Span, + excess: Vec, + regex: Regex, + columns: Vec, + stream: Box, ShellError>> + Send + 'static>, +} + +impl Iterator for ParseStreamerExternal { + type Item = Value; + fn next(&mut self) -> Option { + if !self.excess.is_empty() { + return Some(self.excess.remove(0)); + } + + let v = self.stream.next(); + + if let Some(Ok(v)) = v { + match String::from_utf8(v) { + Ok(s) => stream_helper( + self.regex.clone(), + self.span, + s, + self.columns.clone(), + &mut self.excess, + ), + Err(_) => Some(Value::Error { + error: ShellError::PipelineMismatch("string".into(), self.span, self.span), + }), + } + } else if let Some(Err(err)) = v { + Some(Value::Error { error: err }) + } else { + None + } + } +} + +fn stream_helper( + regex: Regex, + span: Span, + s: String, + columns: Vec, + excess: &mut Vec, +) -> Option { + let results = regex.captures_iter(&s); + + for c in results { + let mut cols = Vec::with_capacity(columns.len()); + let captures = match c { + Ok(c) => c, + Err(e) => { + return Some(Value::Error { + error: ShellError::GenericError( + "Error with regular expression captures".into(), + e.to_string(), + None, + None, + Vec::new(), + ), + }) + } + }; + let mut vals = Vec::with_capacity(captures.len()); + + for (column_name, cap) in columns.iter().zip(captures.iter().skip(1)) { + let cap_string = cap.map(|v| v.as_str()).unwrap_or("").to_string(); + cols.push(column_name.clone()); + vals.push(Value::String { + val: cap_string, + span, + }); + } + + excess.push(Value::Record { cols, vals, span }); + } + + if !excess.is_empty() { + Some(excess.remove(0)) + } else { + None + } +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/nu-command/tests/commands/parse.rs b/crates/nu-command/tests/commands/parse.rs index 6998443a2..471c9db54 100644 --- a/crates/nu-command/tests/commands/parse.rs +++ b/crates/nu-command/tests/commands/parse.rs @@ -189,4 +189,14 @@ mod regex { .contains("Opening parenthesis without closing parenthesis")); }) } + + #[test] + fn parse_works_with_streaming() { + let actual = nu!( + cwd: ".", pipeline( + r#"seq char a z | each {|c| $c + " a"} | parse '{letter} {a}' | describe"# + )); + + assert_eq!(actual.out, "table (stream)") + } }