mirror of
https://github.com/nushell/nushell.git
synced 2025-01-05 05:50:14 +01:00
add binary support to chunks
This commit is contained in:
parent
039d0a685a
commit
9fef87cd4b
@ -1,6 +1,9 @@
|
|||||||
use nu_engine::command_prelude::*;
|
use nu_engine::command_prelude::*;
|
||||||
use nu_protocol::ListStream;
|
use nu_protocol::ListStream;
|
||||||
use std::num::NonZeroUsize;
|
use std::{
|
||||||
|
io::{BufRead, Cursor, ErrorKind},
|
||||||
|
num::NonZeroUsize,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Chunks;
|
pub struct Chunks;
|
||||||
@ -15,6 +18,7 @@ impl Command for Chunks {
|
|||||||
.input_output_types(vec![
|
.input_output_types(vec![
|
||||||
(Type::table(), Type::list(Type::table())),
|
(Type::table(), Type::list(Type::table())),
|
||||||
(Type::list(Type::Any), Type::list(Type::list(Type::Any))),
|
(Type::list(Type::Any), Type::list(Type::list(Type::Any))),
|
||||||
|
(Type::Binary, Type::list(Type::Binary)),
|
||||||
])
|
])
|
||||||
.required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
|
.required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
|
||||||
.category(Category::Filters)
|
.category(Category::Filters)
|
||||||
@ -72,6 +76,15 @@ impl Command for Chunks {
|
|||||||
]),
|
]),
|
||||||
])),
|
])),
|
||||||
},
|
},
|
||||||
|
Example {
|
||||||
|
example: "0x[11 22 33 44 55 66 77 88] | chunks 3",
|
||||||
|
description: "Chunk the bytes of a binary into triplets",
|
||||||
|
result: Some(Value::test_list(vec![
|
||||||
|
Value::test_binary(vec![0x11, 0x22, 0x33]),
|
||||||
|
Value::test_binary(vec![0x44, 0x55, 0x66]),
|
||||||
|
Value::test_binary(vec![0x77, 0x88]),
|
||||||
|
])),
|
||||||
|
},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,6 +129,43 @@ pub fn chunks(
|
|||||||
let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
|
let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
|
||||||
Ok(PipelineData::ListStream(stream, metadata))
|
Ok(PipelineData::ListStream(stream, metadata))
|
||||||
}
|
}
|
||||||
|
PipelineData::Value(Value::Binary { val, .. }, metadata) => {
|
||||||
|
let chunk_read = ChunkRead {
|
||||||
|
reader: Cursor::new(val),
|
||||||
|
size: chunk_size,
|
||||||
|
};
|
||||||
|
let value_stream = chunk_read.map(move |chunk| match chunk {
|
||||||
|
Ok(chunk) => Value::binary(chunk, span),
|
||||||
|
Err(e) => Value::error(e.into(), span),
|
||||||
|
});
|
||||||
|
let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata(
|
||||||
|
span,
|
||||||
|
engine_state.signals().clone(),
|
||||||
|
metadata,
|
||||||
|
);
|
||||||
|
Ok(pipeline_data_with_metadata)
|
||||||
|
}
|
||||||
|
PipelineData::ByteStream(stream, metadata) => {
|
||||||
|
let pipeline_data = match stream.reader() {
|
||||||
|
None => PipelineData::Empty,
|
||||||
|
Some(reader) => {
|
||||||
|
let chunk_read = ChunkRead {
|
||||||
|
reader,
|
||||||
|
size: chunk_size,
|
||||||
|
};
|
||||||
|
let value_stream = chunk_read.map(move |chunk| match chunk {
|
||||||
|
Ok(chunk) => Value::binary(chunk, span),
|
||||||
|
Err(e) => Value::error(e.into(), span),
|
||||||
|
});
|
||||||
|
value_stream.into_pipeline_data_with_metadata(
|
||||||
|
span,
|
||||||
|
engine_state.signals().clone(),
|
||||||
|
metadata,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(pipeline_data)
|
||||||
|
}
|
||||||
input => Err(input.unsupported_input_error("list", span)),
|
input => Err(input.unsupported_input_error("list", span)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -148,10 +198,69 @@ impl<I: Iterator<Item = Value>> Iterator for ChunksIter<I> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ChunkRead<R: BufRead> {
|
||||||
|
reader: R,
|
||||||
|
size: NonZeroUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: BufRead> Iterator for ChunkRead<R> {
|
||||||
|
type Item = Result<Vec<u8>, std::io::Error>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let mut buf = Vec::with_capacity(self.size.get());
|
||||||
|
while buf.len() < self.size.get() {
|
||||||
|
let available = match self.reader.fill_buf() {
|
||||||
|
Ok([]) if buf.is_empty() => return None,
|
||||||
|
Ok([]) => return Some(Ok(buf)),
|
||||||
|
Ok(n) => n,
|
||||||
|
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||||
|
Err(e) => return Some(Err(e)),
|
||||||
|
};
|
||||||
|
let needed = self.size.get() - buf.len();
|
||||||
|
let have = available.len().min(needed);
|
||||||
|
buf.extend_from_slice(&available[..have]);
|
||||||
|
self.reader.consume(have);
|
||||||
|
}
|
||||||
|
Some(Ok(buf))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn chunk_read() {
|
||||||
|
let data = Cursor::new("hello world");
|
||||||
|
let chunk_read = ChunkRead {
|
||||||
|
reader: data,
|
||||||
|
size: NonZeroUsize::new(4).unwrap(),
|
||||||
|
};
|
||||||
|
let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
chunks,
|
||||||
|
[b"hell".to_vec(), b"o wo".to_vec(), b"rld".to_vec()]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn chunk_read_stream() {
|
||||||
|
let data = Cursor::new("hel")
|
||||||
|
.chain(Cursor::new("lo wor"))
|
||||||
|
.chain(Cursor::new("ld"));
|
||||||
|
let chunk_read = ChunkRead {
|
||||||
|
reader: data,
|
||||||
|
size: NonZeroUsize::new(4).unwrap(),
|
||||||
|
};
|
||||||
|
let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
chunks,
|
||||||
|
[b"hell".to_vec(), b"o wo".to_vec(), b"rld".to_vec()]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_examples() {
|
fn test_examples() {
|
||||||
use crate::test_examples;
|
use crate::test_examples;
|
||||||
|
Loading…
Reference in New Issue
Block a user