parse: collect external stream chunks before matching (#9950)

<!--
if this PR closes one or more issues, you can automatically link the PR
with
them by using one of the [*linking
keywords*](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword),
e.g.
- this PR should close #xxxx
- fixes #xxxx
-->

# Description
This PR implements the workaround discussed in #9795, i.e. having
`parse` collect an external stream before operating on it with a regex.

- Should close #9795 

# User-Facing Changes
<!-- List of all changes that impact the user experience here. This
helps us keep track of breaking changes. -->
- `parse` will give the correct output for external streams
- increased memory and time overhead due to collecting the entire stream
(no short-circuiting)

# Tests + Formatting
<!--
Don't forget to add tests that cover your changes.

Make sure you've run and fixed any issues with these commands:

- `cargo fmt --all -- --check` to check standard code formatting (`cargo
fmt --all` applies these changes)
- `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used -A
clippy::needless_collect -A clippy::result_large_err` to check that
you're using the standard code style
- `cargo test --workspace` to check that all tests pass
- `cargo run -- -c "use std testing; testing run-tests --path
crates/nu-std"` to run the tests for the standard library

> **Note**
> from `nushell` you can also use the `toolkit` as follows
> ```bash
> use toolkit.nu # or use an `env_change` hook to activate it
automatically
> toolkit check pr
> ```
-->
- formatting is checked
- clippy is happy
- no tests that weren't already broken fail
- added test case

# After Submitting
<!-- If your PR had any user-facing changes, update [the
documentation](https://github.com/nushell/nushell.github.io) after the
PR is merged, if necessary. This will help us keep the docs up to date.
-->
This commit is contained in:
panicbit 2023-08-08 13:48:13 +02:00 committed by GitHub
parent 570175f95d
commit 56ed1eb807
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 23 deletions

View File

@ -394,32 +394,48 @@ impl Iterator for ParseStreamerExternal {
return Some(self.excess.remove(0));
}
let v = self.stream.next();
let mut chunk = self.stream.next();
if let Some(Ok(v)) = v {
match String::from_utf8(v) {
Ok(s) => stream_helper(
self.regex.clone(),
self.span,
s,
self.columns.clone(),
&mut self.excess,
),
Err(_) => Some(Value::Error {
// Collect all `stream` chunks into a single `chunk` to be able to deal with matches that
// extend across chunk boundaries.
// This is a stop-gap solution until the `regex` crate supports streaming or an alternative
// solution is found.
// See https://github.com/nushell/nushell/issues/9795
while let Some(Ok(chunks)) = &mut chunk {
match self.stream.next() {
Some(Ok(mut next_chunk)) => chunks.append(&mut next_chunk),
error @ Some(Err(_)) => chunk = error,
None => break,
}
}
let chunk = match chunk {
Some(Ok(chunk)) => chunk,
Some(Err(err)) => {
return Some(Value::Error {
error: Box::new(err),
})
}
_ => return None,
};
let Ok(chunk) = String::from_utf8(chunk) else {
return Some(Value::Error {
error: Box::new(ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: self.span,
src_span: self.span,
}),
}),
}
} else if let Some(Err(err)) = v {
Some(Value::Error {
error: Box::new(err),
})
} else {
None
}
};
stream_helper(
self.regex.clone(),
self.span,
chunk,
self.columns.clone(),
&mut self.excess,
)
}
}

View File

@ -211,4 +211,19 @@ mod regex {
assert_eq!(actual.out, "2");
}
#[test]
fn parse_handles_external_stream_chunking() {
Playground::setup("parse_test_streaming_1", |dirs, _sandbox| {
let actual = nu!(
cwd: dirs.test(), pipeline(
r#"
"abcdefghijklmnopqrstuvwxyz" * 1000 | save --force data.txt;
open data.txt | parse --regex "(abcdefghijklmnopqrstuvwxyz)" | length
"#
));
assert_eq!(actual.out, "1000");
})
}
}