allow parse to work better with streams (#7870)

# Description

Fixes #7864. Haven't removed redundant code yet; and there's also a
weird visual bug, but I'm not sure if that's the fault of this PR or
just a quirk of how tabling works:

```
/home/gabriel/CodingProjects/nushell〉ping 1.1.1.1 | parse -r '(?P<num>\d+) ttl'                                                                                        01/27/2023 11:28:31 AM
╭───┬─────╮
│ # │ num │
├───┼─────┤
│ 0 │ 1   │
│ 1 │ 2   │
╰───┴─────╯
╭───┬─────╮
│ # │ num │
├───┼─────┤
│ 2 │ 3   │
╰───┴─────╯
╭───┬─────╮
│ # │ num │
├───┼─────┤
│ 3 │ 4   │
│ 4 │ 5   │
╰───┴─────╯
╭───┬─────╮
│ # │ num │
├───┼─────┤
│ 5 │ 6   │
│ 6 │ 7   │
╰───┴─────╯
^C
/home/gabriel/CodingProjects/nushell〉                                                                                                                                  01/27/2023 11:28:59 AM

```

# User-Facing Changes

_(List of all changes that impact the user experience here. This helps
us keep track of breaking changes.)_

# Tests + Formatting

Don't forget to add tests that cover your changes.

Make sure you've run and fixed any issues with these commands:

- `cargo fmt --all -- --check` to check standard code formatting (`cargo
fmt --all` applies these changes)
- `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used -A
clippy::needless_collect` to check that you're using the standard code
style
- `cargo test --workspace` to check that all tests pass

# After Submitting

If your PR had any user-facing changes, update [the
documentation](https://github.com/nushell/nushell.github.io) after the
PR is merged, if necessary. This will help us keep the docs up to date.
This commit is contained in:
pwygab 2023-02-09 10:59:02 +08:00 committed by GitHub
parent f4bf7316fe
commit 8e9ed14b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 218 additions and 46 deletions

View File

@ -160,59 +160,97 @@ fn operate(
})?;
let columns = column_names(&regex_pattern);
let mut parsed: Vec<Value> = Vec::new();
for v in input {
match v.as_string() {
Ok(s) => {
let results = regex_pattern.captures_iter(&s);
match input {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(..) => {
let mut parsed: Vec<Value> = Vec::new();
for c in results {
let mut cols = Vec::with_capacity(columns.len());
let captures = match c {
Ok(c) => c,
Err(e) => {
return Err(ShellError::GenericError(
"Error with regular expression captures".into(),
e.to_string(),
None,
None,
Vec::new(),
))
for v in input {
match v.as_string() {
Ok(s) => {
let results = regex_pattern.captures_iter(&s);
for c in results {
let mut cols = Vec::with_capacity(columns.len());
let captures = match c {
Ok(c) => c,
Err(e) => {
return Err(ShellError::GenericError(
"Error with regular expression captures".into(),
e.to_string(),
None,
None,
Vec::new(),
))
}
};
let mut vals = Vec::with_capacity(captures.len());
for (column_name, cap) in columns.iter().zip(captures.iter().skip(1)) {
let cap_string = cap.map(|v| v.as_str()).unwrap_or("").to_string();
cols.push(column_name.clone());
vals.push(Value::String {
val: cap_string,
span: v.span()?,
});
}
parsed.push(Value::Record {
cols,
vals,
span: head,
});
}
};
let mut vals = Vec::with_capacity(captures.len());
for (column_name, cap) in columns.iter().zip(captures.iter().skip(1)) {
let cap_string = cap.map(|v| v.as_str()).unwrap_or("").to_string();
cols.push(column_name.clone());
vals.push(Value::String {
val: cap_string,
span: v.span()?,
});
}
parsed.push(Value::Record {
cols,
vals,
span: head,
});
Err(_) => {
return Err(ShellError::PipelineMismatch(
"string".into(),
head,
v.span()?,
))
}
}
}
Err(_) => {
return Err(ShellError::PipelineMismatch(
"string".into(),
head,
v.span()?,
))
}
}
}
Ok(PipelineData::ListStream(
ListStream::from_stream(parsed.into_iter(), ctrlc),
None,
))
Ok(PipelineData::ListStream(
ListStream::from_stream(parsed.into_iter(), ctrlc),
None,
))
}
PipelineData::ListStream(stream, ..) => Ok(PipelineData::ListStream(
ListStream::from_stream(
ParseStreamer {
span: head,
excess: Vec::new(),
regex: regex_pattern,
columns,
stream: stream.stream,
},
ctrlc,
),
None,
)),
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty),
PipelineData::ExternalStream {
stdout: Some(stream),
..
} => Ok(PipelineData::ListStream(
ListStream::from_stream(
ParseStreamerExternal {
span: head,
excess: Vec::new(),
regex: regex_pattern,
columns,
stream: stream.stream,
},
ctrlc,
),
None,
)),
}
}
fn build_regex(input: &str, span: Span) -> Result<String, ShellError> {
@ -281,6 +319,130 @@ fn column_names(regex: &Regex) -> Vec<String> {
.collect()
}
pub struct ParseStreamer {
span: Span,
excess: Vec<Value>,
regex: Regex,
columns: Vec<String>,
stream: Box<dyn Iterator<Item = Value> + Send + 'static>,
}
impl Iterator for ParseStreamer {
type Item = Value;
fn next(&mut self) -> Option<Value> {
if !self.excess.is_empty() {
return Some(self.excess.remove(0));
}
let v = self.stream.next();
if let Some(v) = v {
match v.as_string() {
Ok(s) => stream_helper(
self.regex.clone(),
v.span().unwrap_or(self.span),
s,
self.columns.clone(),
&mut self.excess,
),
Err(_) => Some(Value::Error {
error: ShellError::PipelineMismatch(
"string".into(),
self.span,
v.span().unwrap_or(self.span),
),
}),
}
} else {
None
}
}
}
pub struct ParseStreamerExternal {
span: Span,
excess: Vec<Value>,
regex: Regex,
columns: Vec<String>,
stream: Box<dyn Iterator<Item = Result<Vec<u8>, ShellError>> + Send + 'static>,
}
impl Iterator for ParseStreamerExternal {
type Item = Value;
fn next(&mut self) -> Option<Value> {
if !self.excess.is_empty() {
return Some(self.excess.remove(0));
}
let v = self.stream.next();
if let Some(Ok(v)) = v {
match String::from_utf8(v) {
Ok(s) => stream_helper(
self.regex.clone(),
self.span,
s,
self.columns.clone(),
&mut self.excess,
),
Err(_) => Some(Value::Error {
error: ShellError::PipelineMismatch("string".into(), self.span, self.span),
}),
}
} else if let Some(Err(err)) = v {
Some(Value::Error { error: err })
} else {
None
}
}
}
fn stream_helper(
regex: Regex,
span: Span,
s: String,
columns: Vec<String>,
excess: &mut Vec<Value>,
) -> Option<Value> {
let results = regex.captures_iter(&s);
for c in results {
let mut cols = Vec::with_capacity(columns.len());
let captures = match c {
Ok(c) => c,
Err(e) => {
return Some(Value::Error {
error: ShellError::GenericError(
"Error with regular expression captures".into(),
e.to_string(),
None,
None,
Vec::new(),
),
})
}
};
let mut vals = Vec::with_capacity(captures.len());
for (column_name, cap) in columns.iter().zip(captures.iter().skip(1)) {
let cap_string = cap.map(|v| v.as_str()).unwrap_or("").to_string();
cols.push(column_name.clone());
vals.push(Value::String {
val: cap_string,
span,
});
}
excess.push(Value::Record { cols, vals, span });
}
if !excess.is_empty() {
Some(excess.remove(0))
} else {
None
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -189,4 +189,14 @@ mod regex {
.contains("Opening parenthesis without closing parenthesis"));
})
}
#[test]
fn parse_works_with_streaming() {
let actual = nu!(
cwd: ".", pipeline(
r#"seq char a z | each {|c| $c + " a"} | parse '{letter} {a}' | describe"#
));
assert_eq!(actual.out, "table<letter: string, a: string> (stream)")
}
}