From 469e23cae4dd840189e9e1408672c9ef9dc97b11 Mon Sep 17 00:00:00 2001 From: Bahex Date: Wed, 25 Dec 2024 16:04:43 +0300 Subject: [PATCH] Add `bytes split` command (#14652) Related #10708 # Description Add `bytes split` command. `bytes split` splits its input on the provided separator on binary values _and_ binary streams without collecting. The separator can be a multiple character string or multiple byte binary. It can be used when neither `split row` (not streaming over raw input) nor `lines` (streaming, but can only split on newlines) is right. The backing iterator implemented in this PR, `SplitRead`, can be used to implement a streaming `split row` in the future. # User-Facing Changes `bytes split` command added, which can be used to split binary values and raw streams using a separator. # Tests + Formatting - :green_circle: toolkit fmt - :green_circle: toolkit clippy - :green_circle: toolkit test - :green_circle: toolkit test stdlib # After Submitting Mention in release notes. --- Cargo.lock | 1 + Cargo.toml | 1 + crates/nu-command/src/bytes/mod.rs | 2 + crates/nu-command/src/bytes/split.rs | 109 +++++++++ crates/nu-command/src/default_context.rs | 1 + crates/nu-protocol/Cargo.toml | 3 +- .../nu-protocol/src/pipeline/byte_stream.rs | 206 ++++++++++++++++++ 7 files changed, 322 insertions(+), 1 deletion(-) create mode 100644 crates/nu-command/src/bytes/split.rs diff --git a/Cargo.lock b/Cargo.lock index 01a6592e41..3300716fd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4055,6 +4055,7 @@ dependencies = [ "indexmap", "log", "lru", + "memchr", "miette", "nix 0.29.0", "nu-derive-value", diff --git a/Cargo.toml b/Cargo.toml index 0948360b20..c6702371b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -183,6 +183,7 @@ which = "7.0.0" windows = "0.56" windows-sys = "0.48" winreg = "0.52" +memchr = "2.7.4" [workspace.lints.clippy] # Warning: workspace lints affect library code as well as tests, so don't enable lints that would be too noisy in tests like that. diff --git a/crates/nu-command/src/bytes/mod.rs b/crates/nu-command/src/bytes/mod.rs index 6d47bb892c..73018f3867 100644 --- a/crates/nu-command/src/bytes/mod.rs +++ b/crates/nu-command/src/bytes/mod.rs @@ -9,6 +9,7 @@ mod length; mod remove; mod replace; mod reverse; +mod split; mod starts_with; pub use add::BytesAdd; @@ -22,4 +23,5 @@ pub use length::BytesLen; pub use remove::BytesRemove; pub use replace::BytesReplace; pub use reverse::BytesReverse; +pub use split::BytesSplit; pub use starts_with::BytesStartsWith; diff --git a/crates/nu-command/src/bytes/split.rs b/crates/nu-command/src/bytes/split.rs new file mode 100644 index 0000000000..40d39e63b7 --- /dev/null +++ b/crates/nu-command/src/bytes/split.rs @@ -0,0 +1,109 @@ +use nu_engine::command_prelude::*; + +#[derive(Clone)] +pub struct BytesSplit; + +impl Command for BytesSplit { + fn name(&self) -> &str { + "bytes split" + } + + fn signature(&self) -> Signature { + Signature::build("bytes split") + .input_output_types(vec![(Type::Binary, Type::list(Type::Binary))]) + .required( + "separator", + SyntaxShape::OneOf(vec![SyntaxShape::Binary, SyntaxShape::String]), + "Bytes or string that the input will be split on (must be non-empty).", + ) + .category(Category::Bytes) + } + + fn description(&self) -> &str { + "Split input into multiple items using a separator." + } + + fn search_terms(&self) -> Vec<&str> { + vec!["separate", "stream"] + } + + fn examples(&self) -> Vec { + vec![ + Example { + example: r#"0x[66 6F 6F 20 62 61 72 20 62 61 7A 20] | bytes split 0x[20]"#, + description: "Split a binary value using a binary separator", + result: Some(Value::test_list(vec![ + Value::test_binary("foo"), + Value::test_binary("bar"), + Value::test_binary("baz"), + Value::test_binary(""), + ])), + }, + Example { + example: r#"0x[61 2D 2D 62 2D 2D 63] | bytes split "--""#, + description: "Split a binary value using a string separator", + result: Some(Value::test_list(vec![ + Value::test_binary("a"), + Value::test_binary("b"), + Value::test_binary("c"), + ])), + }, + ] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + let head = call.head; + let Spanned { + item: separator, + span, + }: Spanned> = call.req(engine_state, stack, 0)?; + + if separator.is_empty() { + return Err(ShellError::IncorrectValue { + msg: "Separator can't be empty".into(), + val_span: span, + call_span: call.head, + }); + } + + let (split_read, md) = match input { + PipelineData::Value(Value::Binary { val, .. }, md) => ( + ByteStream::read_binary(val, head, engine_state.signals().clone()).split(separator), + md, + ), + PipelineData::ByteStream(stream, md) => (stream.split(separator), md), + input => { + let span = input.span().unwrap_or(head); + return Err(input.unsupported_input_error("bytes", span)); + } + }; + if let Some(split) = split_read { + Ok(split + .map(move |part| match part { + Ok(val) => Value::binary(val, head), + Err(err) => Value::error(err, head), + }) + .into_pipeline_data_with_metadata(head, engine_state.signals().clone(), md)) + } else { + Ok(PipelineData::empty()) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_examples() { + use crate::test_examples; + + test_examples(BytesSplit {}) + } +} diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index bcff06b5e8..168ee78027 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -380,6 +380,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState { bind_command! { Bytes, BytesLen, + BytesSplit, BytesStartsWith, BytesEndsWith, BytesReverse, diff --git a/crates/nu-protocol/Cargo.toml b/crates/nu-protocol/Cargo.toml index d162999b9d..b4a126511b 100644 --- a/crates/nu-protocol/Cargo.toml +++ b/crates/nu-protocol/Cargo.toml @@ -41,6 +41,7 @@ typetag = "0.2" os_pipe = { workspace = true, optional = true, features = ["io_safety"] } log = { workspace = true } web-time = { workspace = true } +memchr = { workspace = true } [target.'cfg(unix)'.dependencies] nix = { workspace = true, default-features = false, features = ["signal"] } @@ -74,4 +75,4 @@ tempfile = { workspace = true } os_pipe = { workspace = true } [package.metadata.docs.rs] -all-features = true \ No newline at end of file +all-features = true diff --git a/crates/nu-protocol/src/pipeline/byte_stream.rs b/crates/nu-protocol/src/pipeline/byte_stream.rs index 2b7c93495e..e24c0753e6 100644 --- a/crates/nu-protocol/src/pipeline/byte_stream.rs +++ b/crates/nu-protocol/src/pipeline/byte_stream.rs @@ -415,6 +415,18 @@ impl ByteStream { }) } + /// Convert the [`ByteStream`] into a [`SplitRead`] iterator where each element is a `Result`. + /// + /// Each call to [`next`](Iterator::next) reads the currently available data from the byte + /// stream source, until `delimiter` or the end of the stream is encountered. + /// + /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout, + /// then the stream is considered empty and `None` will be returned. + pub fn split(self, delimiter: Vec) -> Option { + let reader = self.stream.reader()?; + Some(SplitRead::new(reader, delimiter, self.span, self.signals)) + } + /// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result`. /// /// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source, @@ -746,6 +758,200 @@ impl Iterator for Lines { } } +mod split_read { + use std::io::{BufRead, ErrorKind}; + + use memchr::memmem::Finder; + + pub struct SplitRead { + reader: Option, + buf: Option>, + finder: Finder<'static>, + } + + impl SplitRead { + pub fn new(reader: R, delim: impl AsRef<[u8]>) -> Self { + // empty delimiter results in an infinite stream of empty items + debug_assert!(!delim.as_ref().is_empty(), "delimiter can't be empty"); + Self { + reader: Some(reader), + buf: Some(Vec::new()), + finder: Finder::new(delim.as_ref()).into_owned(), + } + } + } + + impl Iterator for SplitRead { + type Item = Result, std::io::Error>; + + fn next(&mut self) -> Option { + let buf = self.buf.as_mut()?; + let mut search_start = 0usize; + + loop { + if let Some(i) = self.finder.find(&buf[search_start..]) { + let needle_idx = search_start + i; + let right = buf.split_off(needle_idx + self.finder.needle().len()); + buf.truncate(needle_idx); + let left = std::mem::replace(buf, right); + return Some(Ok(left)); + } + + if let Some(mut r) = self.reader.take() { + search_start = buf.len().saturating_sub(self.finder.needle().len() + 1); + let available = match r.fill_buf() { + Ok(n) => n, + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Some(Err(e)), + }; + + buf.extend_from_slice(available); + let used = available.len(); + r.consume(used); + if used != 0 { + self.reader = Some(r); + } + continue; + } else { + return self.buf.take().map(Ok); + } + } + } + } + + #[cfg(test)] + mod tests { + use super::*; + use std::io::{self, Cursor, Read}; + + #[test] + fn simple() { + let s = "foo-bar-baz"; + let cursor = Cursor::new(String::from(s)); + let mut split = + SplitRead::new(cursor, "-").map(|r| String::from_utf8(r.unwrap()).unwrap()); + + assert_eq!(split.next().as_deref(), Some("foo")); + assert_eq!(split.next().as_deref(), Some("bar")); + assert_eq!(split.next().as_deref(), Some("baz")); + assert_eq!(split.next(), None); + } + + #[test] + fn with_empty_fields() -> Result<(), io::Error> { + let s = "\0\0foo\0\0bar\0\0\0\0baz\0\0"; + let cursor = Cursor::new(String::from(s)); + let mut split = + SplitRead::new(cursor, "\0\0").map(|r| String::from_utf8(r.unwrap()).unwrap()); + + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), Some("foo")); + assert_eq!(split.next().as_deref(), Some("bar")); + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), Some("baz")); + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), None); + + Ok(()) + } + + #[test] + fn complex_delimiter() -> Result<(), io::Error> { + let s = "<|>foo<|>bar<|><|>baz<|>"; + let cursor = Cursor::new(String::from(s)); + let mut split = + SplitRead::new(cursor, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap()); + + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), Some("foo")); + assert_eq!(split.next().as_deref(), Some("bar")); + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), Some("baz")); + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), None); + + Ok(()) + } + + #[test] + fn all_empty() -> Result<(), io::Error> { + let s = "<><>"; + let cursor = Cursor::new(String::from(s)); + let mut split = + SplitRead::new(cursor, "<>").map(|r| String::from_utf8(r.unwrap()).unwrap()); + + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next().as_deref(), Some("")); + assert_eq!(split.next(), None); + + Ok(()) + } + + #[should_panic = "delimiter can't be empty"] + #[test] + fn empty_delimiter() { + let s = "abc"; + let cursor = Cursor::new(String::from(s)); + let _split = SplitRead::new(cursor, "").map(|e| e.unwrap()); + } + + #[test] + fn delimiter_spread_across_reads() { + let reader = Cursor::new("<|>foo<|") + .chain(Cursor::new(">bar<|><")) + .chain(Cursor::new("|>baz<|>")); + + let mut split = + SplitRead::new(reader, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap()); + + assert_eq!(split.next().unwrap(), ""); + assert_eq!(split.next().unwrap(), "foo"); + assert_eq!(split.next().unwrap(), "bar"); + assert_eq!(split.next().unwrap(), ""); + assert_eq!(split.next().unwrap(), "baz"); + assert_eq!(split.next().unwrap(), ""); + assert_eq!(split.next(), None); + } + } +} + +pub struct SplitRead { + internal: split_read::SplitRead>, + span: Span, + signals: Signals, +} + +impl SplitRead { + fn new( + reader: SourceReader, + delimiter: impl AsRef<[u8]>, + span: Span, + signals: Signals, + ) -> Self { + Self { + internal: split_read::SplitRead::new(BufReader::new(reader), delimiter), + span, + signals, + } + } + + pub fn span(&self) -> Span { + self.span + } +} + +impl Iterator for SplitRead { + type Item = Result, ShellError>; + + fn next(&mut self) -> Option { + if self.signals.interrupted() { + return None; + } + self.internal.next().map(|r| r.map_err(|e| e.into())) + } +} + /// Turn a readable stream into [`Value`]s. /// /// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the