forked from extern/nushell
Add bytes split
command (#14652)
Related #10708 # Description Add `bytes split` command. `bytes split` splits its input on the provided separator on binary values _and_ binary streams without collecting. The separator can be a multiple character string or multiple byte binary. It can be used when neither `split row` (not streaming over raw input) nor `lines` (streaming, but can only split on newlines) is right. The backing iterator implemented in this PR, `SplitRead`, can be used to implement a streaming `split row` in the future. # User-Facing Changes `bytes split` command added, which can be used to split binary values and raw streams using a separator. # Tests + Formatting - 🟢 toolkit fmt - 🟢 toolkit clippy - 🟢 toolkit test - 🟢 toolkit test stdlib # After Submitting Mention in release notes.
This commit is contained in:
@ -415,6 +415,18 @@ impl ByteStream {
|
||||
})
|
||||
}
|
||||
|
||||
/// Convert the [`ByteStream`] into a [`SplitRead`] iterator where each element is a `Result<String, ShellError>`.
|
||||
///
|
||||
/// Each call to [`next`](Iterator::next) reads the currently available data from the byte
|
||||
/// stream source, until `delimiter` or the end of the stream is encountered.
|
||||
///
|
||||
/// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
|
||||
/// then the stream is considered empty and `None` will be returned.
|
||||
pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
|
||||
let reader = self.stream.reader()?;
|
||||
Some(SplitRead::new(reader, delimiter, self.span, self.signals))
|
||||
}
|
||||
|
||||
/// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result<Value, ShellError>`.
|
||||
///
|
||||
/// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source,
|
||||
@ -746,6 +758,200 @@ impl Iterator for Lines {
|
||||
}
|
||||
}
|
||||
|
||||
mod split_read {
|
||||
use std::io::{BufRead, ErrorKind};
|
||||
|
||||
use memchr::memmem::Finder;
|
||||
|
||||
pub struct SplitRead<R> {
|
||||
reader: Option<R>,
|
||||
buf: Option<Vec<u8>>,
|
||||
finder: Finder<'static>,
|
||||
}
|
||||
|
||||
impl<R: BufRead> SplitRead<R> {
|
||||
pub fn new(reader: R, delim: impl AsRef<[u8]>) -> Self {
|
||||
// empty delimiter results in an infinite stream of empty items
|
||||
debug_assert!(!delim.as_ref().is_empty(), "delimiter can't be empty");
|
||||
Self {
|
||||
reader: Some(reader),
|
||||
buf: Some(Vec::new()),
|
||||
finder: Finder::new(delim.as_ref()).into_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: BufRead> Iterator for SplitRead<R> {
|
||||
type Item = Result<Vec<u8>, std::io::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let buf = self.buf.as_mut()?;
|
||||
let mut search_start = 0usize;
|
||||
|
||||
loop {
|
||||
if let Some(i) = self.finder.find(&buf[search_start..]) {
|
||||
let needle_idx = search_start + i;
|
||||
let right = buf.split_off(needle_idx + self.finder.needle().len());
|
||||
buf.truncate(needle_idx);
|
||||
let left = std::mem::replace(buf, right);
|
||||
return Some(Ok(left));
|
||||
}
|
||||
|
||||
if let Some(mut r) = self.reader.take() {
|
||||
search_start = buf.len().saturating_sub(self.finder.needle().len() + 1);
|
||||
let available = match r.fill_buf() {
|
||||
Ok(n) => n,
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(e) => return Some(Err(e)),
|
||||
};
|
||||
|
||||
buf.extend_from_slice(available);
|
||||
let used = available.len();
|
||||
r.consume(used);
|
||||
if used != 0 {
|
||||
self.reader = Some(r);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
return self.buf.take().map(Ok);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::{self, Cursor, Read};
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
let s = "foo-bar-baz";
|
||||
let cursor = Cursor::new(String::from(s));
|
||||
let mut split =
|
||||
SplitRead::new(cursor, "-").map(|r| String::from_utf8(r.unwrap()).unwrap());
|
||||
|
||||
assert_eq!(split.next().as_deref(), Some("foo"));
|
||||
assert_eq!(split.next().as_deref(), Some("bar"));
|
||||
assert_eq!(split.next().as_deref(), Some("baz"));
|
||||
assert_eq!(split.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn with_empty_fields() -> Result<(), io::Error> {
|
||||
let s = "\0\0foo\0\0bar\0\0\0\0baz\0\0";
|
||||
let cursor = Cursor::new(String::from(s));
|
||||
let mut split =
|
||||
SplitRead::new(cursor, "\0\0").map(|r| String::from_utf8(r.unwrap()).unwrap());
|
||||
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), Some("foo"));
|
||||
assert_eq!(split.next().as_deref(), Some("bar"));
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), Some("baz"));
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn complex_delimiter() -> Result<(), io::Error> {
|
||||
let s = "<|>foo<|>bar<|><|>baz<|>";
|
||||
let cursor = Cursor::new(String::from(s));
|
||||
let mut split =
|
||||
SplitRead::new(cursor, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
|
||||
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), Some("foo"));
|
||||
assert_eq!(split.next().as_deref(), Some("bar"));
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), Some("baz"));
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_empty() -> Result<(), io::Error> {
|
||||
let s = "<><>";
|
||||
let cursor = Cursor::new(String::from(s));
|
||||
let mut split =
|
||||
SplitRead::new(cursor, "<>").map(|r| String::from_utf8(r.unwrap()).unwrap());
|
||||
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next().as_deref(), Some(""));
|
||||
assert_eq!(split.next(), None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[should_panic = "delimiter can't be empty"]
|
||||
#[test]
|
||||
fn empty_delimiter() {
|
||||
let s = "abc";
|
||||
let cursor = Cursor::new(String::from(s));
|
||||
let _split = SplitRead::new(cursor, "").map(|e| e.unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delimiter_spread_across_reads() {
|
||||
let reader = Cursor::new("<|>foo<|")
|
||||
.chain(Cursor::new(">bar<|><"))
|
||||
.chain(Cursor::new("|>baz<|>"));
|
||||
|
||||
let mut split =
|
||||
SplitRead::new(reader, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
|
||||
|
||||
assert_eq!(split.next().unwrap(), "");
|
||||
assert_eq!(split.next().unwrap(), "foo");
|
||||
assert_eq!(split.next().unwrap(), "bar");
|
||||
assert_eq!(split.next().unwrap(), "");
|
||||
assert_eq!(split.next().unwrap(), "baz");
|
||||
assert_eq!(split.next().unwrap(), "");
|
||||
assert_eq!(split.next(), None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SplitRead {
|
||||
internal: split_read::SplitRead<BufReader<SourceReader>>,
|
||||
span: Span,
|
||||
signals: Signals,
|
||||
}
|
||||
|
||||
impl SplitRead {
|
||||
fn new(
|
||||
reader: SourceReader,
|
||||
delimiter: impl AsRef<[u8]>,
|
||||
span: Span,
|
||||
signals: Signals,
|
||||
) -> Self {
|
||||
Self {
|
||||
internal: split_read::SplitRead::new(BufReader::new(reader), delimiter),
|
||||
span,
|
||||
signals,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn span(&self) -> Span {
|
||||
self.span
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for SplitRead {
|
||||
type Item = Result<Vec<u8>, ShellError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.signals.interrupted() {
|
||||
return None;
|
||||
}
|
||||
self.internal.next().map(|r| r.map_err(|e| e.into()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Turn a readable stream into [`Value`]s.
|
||||
///
|
||||
/// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the
|
||||
|
Reference in New Issue
Block a user