Add string/binary type color to ByteStream (#12897)

# Description

This PR allows byte streams to optionally be colored as being
specifically binary or string data, which guarantees that they'll be
converted to `Binary` or `String` appropriately on `into_value()`,
making them compatible with `Type` guarantees. This makes them
significantly more broadly usable for command input and output.

There is still an `Unknown` type for byte streams coming from external
commands, which uses the same behavior as we previously did where it's a
string if it's UTF-8.

A small number of commands were updated to take advantage of this, just
to prove the point. I will be adding more after this merges.

# User-Facing Changes
- New types in `describe`: `string (stream)`, `binary (stream)`
- These commands now return a stream if their input was a stream:
  - `into binary`
  - `into string`
  - `bytes collect`
  - `str join`
  - `first` (binary)
  - `last` (binary)
  - `take` (binary)
  - `skip` (binary)
- Streams that are explicitly binary colored will print as a streaming
hexdump
  - example:
    ```nushell
    1.. | each { into binary } | bytes collect
    ```

# Tests + Formatting
I've added some tests to cover it at a basic level, and it doesn't break
anything existing, but I do think more would be nice. Some of those will
come when I modify more commands to stream.

# After Submitting
There are a few things I'm not quite satisfied with:

- **String trimming behavior.** We automatically trim newlines from
streams from external commands, but I don't think we should do this with
internal commands. If I call a command that happens to turn my string
into a stream, I don't want the newline to suddenly disappear. I changed
this to specifically do it only on `Child` and `File`, but I don't know
if this is quite right, and maybe we should bring back the old flag for
`trim_end_newline`
- **Known binary always resulting in a hexdump.** It would be nice to
have a `print --raw`, so that we can put binary data on stdout
explicitly if we want to. This PR doesn't change how external commands
work though - they still dump straight to stdout.

Otherwise, here's the normal checklist:

- [ ] release notes
- [ ] docs update for plugin protocol changes (added `type` field)

---------

Co-authored-by: Ian Manske <ian.manske@pm.me>
This commit is contained in:
Devyn Cairns 2024-05-19 17:35:32 -07:00 committed by GitHub
parent baeba19b22
commit c61075e20e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1107 additions and 416 deletions

View File

@ -276,8 +276,8 @@ fn evaluate_source(
eval_block::<WithoutDebug>(engine_state, stack, &block, input) eval_block::<WithoutDebug>(engine_state, stack, &block, input)
}?; }?;
let status = if let PipelineData::ByteStream(stream, ..) = pipeline { let status = if let PipelineData::ByteStream(..) = pipeline {
stream.print(false)? pipeline.print(engine_state, stack, false, false)?
} else { } else {
if let Some(hook) = engine_state.get_config().hooks.display_output.clone() { if let Some(hook) = engine_state.get_config().hooks.display_output.clone() {
let pipeline = eval_hook( let pipeline = eval_hook(

View File

@ -163,6 +163,8 @@ fn run(
let description = match input { let description = match input {
PipelineData::ByteStream(stream, ..) => { PipelineData::ByteStream(stream, ..) => {
let type_ = stream.type_().describe();
let description = if options.detailed { let description = if options.detailed {
let origin = match stream.source() { let origin = match stream.source() {
ByteStreamSource::Read(_) => "unknown", ByteStreamSource::Read(_) => "unknown",
@ -172,14 +174,14 @@ fn run(
Value::record( Value::record(
record! { record! {
"type" => Value::string("byte stream", head), "type" => Value::string(type_, head),
"origin" => Value::string(origin, head), "origin" => Value::string(origin, head),
"metadata" => metadata_to_value(metadata, head), "metadata" => metadata_to_value(metadata, head),
}, },
head, head,
) )
} else { } else {
Value::string("byte stream", head) Value::string(type_, head)
}; };
if !options.no_collect { if !options.no_collect {

View File

@ -1,3 +1,4 @@
use itertools::Itertools;
use nu_engine::command_prelude::*; use nu_engine::command_prelude::*;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -35,46 +36,33 @@ impl Command for BytesCollect {
input: PipelineData, input: PipelineData,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let separator: Option<Vec<u8>> = call.opt(engine_state, stack, 0)?; let separator: Option<Vec<u8>> = call.opt(engine_state, stack, 0)?;
let span = call.head;
// input should be a list of binary data. // input should be a list of binary data.
let mut output_binary = vec![]; let metadata = input.metadata();
for value in input { let iter = Itertools::intersperse(
match value { input.into_iter_strict(span)?.map(move |value| {
Value::Binary { mut val, .. } => { // Everything is wrapped in Some in case there's a separator, so we can flatten
output_binary.append(&mut val); Some(match value {
// manually concat // Explicitly propagate errors instead of dropping them.
// TODO: make use of std::slice::Join when it's available in stable. Value::Error { error, .. } => Err(*error),
if let Some(sep) = &separator { Value::Binary { val, .. } => Ok(val),
let mut work_sep = sep.clone(); other => Err(ShellError::OnlySupportsThisInputType {
output_binary.append(&mut work_sep)
}
}
// Explicitly propagate errors instead of dropping them.
Value::Error { error, .. } => return Err(*error),
other => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "binary".into(), exp_input_type: "binary".into(),
wrong_type: other.get_type().to_string(), wrong_type: other.get_type().to_string(),
dst_span: call.head, dst_span: span,
src_span: other.span(), src_span: other.span(),
}); }),
} })
} }),
} Ok(separator).transpose(),
)
.flatten();
match separator { let output = ByteStream::from_result_iter(iter, span, None, ByteStreamType::Binary);
None => Ok(Value::binary(output_binary, call.head).into_pipeline_data()),
Some(sep) => { Ok(PipelineData::ByteStream(output, metadata))
if output_binary.is_empty() {
Ok(Value::binary(output_binary, call.head).into_pipeline_data())
} else {
// have push one extra separator in previous step, pop them out.
for _ in sep {
let _ = output_binary.pop();
}
Ok(Value::binary(output_binary, call.head).into_pipeline_data())
}
}
}
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {

View File

@ -127,15 +127,18 @@ fn into_binary(
let cell_paths = call.rest(engine_state, stack, 0)?; let cell_paths = call.rest(engine_state, stack, 0)?;
let cell_paths = (!cell_paths.is_empty()).then_some(cell_paths); let cell_paths = (!cell_paths.is_empty()).then_some(cell_paths);
if let PipelineData::ByteStream(stream, ..) = input { if let PipelineData::ByteStream(stream, metadata) = input {
// TODO: in the future, we may want this to stream out, converting each to bytes // Just set the type - that should be good enough
Ok(Value::binary(stream.into_bytes()?, head).into_pipeline_data()) Ok(PipelineData::ByteStream(
stream.with_type(ByteStreamType::Binary),
metadata,
))
} else { } else {
let args = Arguments { let args = Arguments {
cell_paths, cell_paths,
compact: call.has_flag(engine_state, stack, "compact")?, compact: call.has_flag(engine_state, stack, "compact")?,
}; };
operate(action, args, input, call.head, engine_state.ctrlc.clone()) operate(action, args, input, head, engine_state.ctrlc.clone())
} }
} }

View File

@ -103,7 +103,7 @@ fn into_cell_path(call: &Call, input: PipelineData) -> Result<PipelineData, Shel
} }
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, int".into(), exp_input_type: "list, int".into(),
wrong_type: "byte stream".into(), wrong_type: stream.type_().describe().into(),
dst_span: head, dst_span: head,
src_span: stream.span(), src_span: stream.span(),
}), }),

View File

@ -156,9 +156,23 @@ fn string_helper(
let cell_paths = call.rest(engine_state, stack, 0)?; let cell_paths = call.rest(engine_state, stack, 0)?;
let cell_paths = (!cell_paths.is_empty()).then_some(cell_paths); let cell_paths = (!cell_paths.is_empty()).then_some(cell_paths);
if let PipelineData::ByteStream(stream, ..) = input { if let PipelineData::ByteStream(stream, metadata) = input {
// TODO: in the future, we may want this to stream out, converting each to bytes // Just set the type - that should be good enough. There is no guarantee that the data
Ok(Value::string(stream.into_string()?, head).into_pipeline_data()) // within a string stream is actually valid UTF-8. But refuse to do it if it was already set
// to binary
if stream.type_() != ByteStreamType::Binary {
Ok(PipelineData::ByteStream(
stream.with_type(ByteStreamType::String),
metadata,
))
} else {
Err(ShellError::CantConvert {
to_type: "string".into(),
from_type: "binary".into(),
span: stream.span(),
help: Some("try using the `decode` command".into()),
})
}
} else { } else {
let config = engine_state.get_config().clone(); let config = engine_state.get_config().clone();
let args = Arguments { let args = Arguments {

View File

@ -135,7 +135,7 @@ fn drop_cols(
PipelineData::Empty => Ok(PipelineData::Empty), PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "table or record".into(), exp_input_type: "table or record".into(),
wrong_type: "byte stream".into(), wrong_type: stream.type_().describe().into(),
dst_span: head, dst_span: head,
src_span: stream.span(), src_span: stream.span(),
}), }),

View File

@ -170,12 +170,43 @@ fn first_helper(
)) ))
} }
} }
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, metadata) => {
exp_input_type: "list, binary or range".into(), if stream.type_() == ByteStreamType::Binary {
wrong_type: "byte stream".into(), let span = stream.span();
dst_span: head, if let Some(mut reader) = stream.reader() {
src_span: stream.span(), use std::io::Read;
}), if return_single_element {
// Take a single byte
let mut byte = [0u8];
if reader.read(&mut byte).err_span(span)? > 0 {
Ok(Value::int(byte[0] as i64, head).into_pipeline_data())
} else {
Err(ShellError::AccessEmptyContent { span: head })
}
} else {
// Just take 'rows' bytes off the stream, mimicking the binary behavior
Ok(PipelineData::ByteStream(
ByteStream::read(
reader.take(rows as u64),
head,
None,
ByteStreamType::Binary,
),
metadata,
))
}
} else {
Ok(PipelineData::Empty)
}
} else {
Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(),
wrong_type: stream.type_().describe().into(),
dst_span: head,
src_span: stream.span(),
})
}
}
PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(), exp_input_type: "list, binary or range".into(),
wrong_type: "null".into(), wrong_type: "null".into(),

View File

@ -261,8 +261,8 @@ fn insert(
type_name: "empty pipeline".to_string(), type_name: "empty pipeline".to_string(),
span: head, span: head,
}), }),
PipelineData::ByteStream(..) => Err(ShellError::IncompatiblePathAccess { PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
type_name: "byte stream".to_string(), type_name: stream.type_().describe().into(),
span: head, span: head,
}), }),
} }

View File

@ -86,7 +86,7 @@ impl Command for Items {
}), }),
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(), exp_input_type: "record".into(),
wrong_type: "byte stream".into(), wrong_type: stream.type_().describe().into(),
dst_span: call.head, dst_span: call.head,
src_span: stream.span(), src_span: stream.span(),
}), }),

View File

@ -160,12 +160,48 @@ impl Command for Last {
}), }),
} }
} }
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, ..) => {
exp_input_type: "list, binary or range".into(), if stream.type_() == ByteStreamType::Binary {
wrong_type: "byte stream".into(), let span = stream.span();
dst_span: head, if let Some(mut reader) = stream.reader() {
src_span: stream.span(), use std::io::Read;
}), // Have to be a bit tricky here, but just consume into a VecDeque that we
// shrink to fit each time
const TAKE: u64 = 8192;
let mut buf = VecDeque::with_capacity(rows + TAKE as usize);
loop {
let taken = std::io::copy(&mut (&mut reader).take(TAKE), &mut buf)
.err_span(span)?;
if buf.len() > rows {
buf.drain(..(buf.len() - rows));
}
if taken < TAKE {
// This must be EOF.
if return_single_element {
if !buf.is_empty() {
return Ok(
Value::int(buf[0] as i64, head).into_pipeline_data()
);
} else {
return Err(ShellError::AccessEmptyContent { span: head });
}
} else {
return Ok(Value::binary(buf, head).into_pipeline_data());
}
}
}
} else {
Ok(PipelineData::Empty)
}
} else {
Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(),
wrong_type: stream.type_().describe().into(),
dst_span: head,
src_span: stream.span(),
})
}
}
PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(), exp_input_type: "list, binary or range".into(),
wrong_type: "null".into(), wrong_type: "null".into(),

View File

@ -12,6 +12,7 @@ impl Command for Skip {
Signature::build(self.name()) Signature::build(self.name())
.input_output_types(vec![ .input_output_types(vec![
(Type::table(), Type::table()), (Type::table(), Type::table()),
(Type::Binary, Type::Binary),
( (
Type::List(Box::new(Type::Any)), Type::List(Box::new(Type::Any)),
Type::List(Box::new(Type::Any)), Type::List(Box::new(Type::Any)),
@ -51,6 +52,11 @@ impl Command for Skip {
"editions" => Value::test_int(2021), "editions" => Value::test_int(2021),
})])), })])),
}, },
Example {
description: "Skip 2 bytes of a binary value",
example: "0x[01 23 45 67] | skip 2",
result: Some(Value::test_binary(vec![0x45, 0x67])),
},
] ]
} }
fn run( fn run(
@ -87,12 +93,30 @@ impl Command for Skip {
let ctrlc = engine_state.ctrlc.clone(); let ctrlc = engine_state.ctrlc.clone();
let input_span = input.span().unwrap_or(call.head); let input_span = input.span().unwrap_or(call.head);
match input { match input {
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, metadata) => {
exp_input_type: "list, binary or range".into(), if stream.type_() == ByteStreamType::Binary {
wrong_type: "byte stream".into(), let span = stream.span();
dst_span: call.head, if let Some(mut reader) = stream.reader() {
src_span: stream.span(), use std::io::Read;
}), // Copy the number of skipped bytes into the sink before proceeding
std::io::copy(&mut (&mut reader).take(n as u64), &mut std::io::sink())
.err_span(span)?;
Ok(PipelineData::ByteStream(
ByteStream::read(reader, call.head, None, ByteStreamType::Binary),
metadata,
))
} else {
Ok(PipelineData::Empty)
}
} else {
Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(),
wrong_type: stream.type_().describe().into(),
dst_span: call.head,
src_span: stream.span(),
})
}
}
PipelineData::Value(Value::Binary { val, .. }, metadata) => { PipelineData::Value(Value::Binary { val, .. }, metadata) => {
let bytes = val.into_iter().skip(n).collect::<Vec<_>>(); let bytes = val.into_iter().skip(n).collect::<Vec<_>>();
Ok(Value::binary(bytes, input_span).into_pipeline_data_with_metadata(metadata)) Ok(Value::binary(bytes, input_span).into_pipeline_data_with_metadata(metadata))

View File

@ -78,12 +78,32 @@ impl Command for Take {
stream.modify(|iter| iter.take(rows_desired)), stream.modify(|iter| iter.take(rows_desired)),
metadata, metadata,
)), )),
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, metadata) => {
exp_input_type: "list, binary or range".into(), if stream.type_() == ByteStreamType::Binary {
wrong_type: "byte stream".into(), if let Some(reader) = stream.reader() {
dst_span: head, use std::io::Read;
src_span: stream.span(), // Just take 'rows' bytes off the stream, mimicking the binary behavior
}), Ok(PipelineData::ByteStream(
ByteStream::read(
reader.take(rows_desired as u64),
head,
None,
ByteStreamType::Binary,
),
metadata,
))
} else {
Ok(PipelineData::Empty)
}
} else {
Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(),
wrong_type: stream.type_().describe().into(),
dst_span: head,
src_span: stream.span(),
})
}
}
PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary or range".into(), exp_input_type: "list, binary or range".into(),
wrong_type: "null".into(), wrong_type: "null".into(),

View File

@ -1,7 +1,7 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return}; use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_protocol::{ use nu_protocol::{
byte_stream::copy_with_interrupt, engine::Closure, process::ChildPipe, ByteStream, byte_stream::copy_with_interrupt, engine::Closure, process::ChildPipe, ByteStream,
ByteStreamSource, OutDest, ByteStreamSource, OutDest, PipelineMetadata,
}; };
use std::{ use std::{
io::{self, Read, Write}, io::{self, Read, Write},
@ -104,9 +104,13 @@ use it in your pipeline."#
if let PipelineData::ByteStream(stream, metadata) = input { if let PipelineData::ByteStream(stream, metadata) = input {
let span = stream.span(); let span = stream.span();
let ctrlc = engine_state.ctrlc.clone(); let ctrlc = engine_state.ctrlc.clone();
let eval_block = { let type_ = stream.type_();
let metadata = metadata.clone();
move |stream| eval_block(PipelineData::ByteStream(stream, metadata)) let info = StreamInfo {
span,
ctrlc: ctrlc.clone(),
type_,
metadata: metadata.clone(),
}; };
match stream.into_source() { match stream.into_source() {
@ -115,10 +119,11 @@ use it in your pipeline."#
return stderr_misuse(span, head); return stderr_misuse(span, head);
} }
let tee = IoTee::new(read, span, eval_block)?; let tee_thread = spawn_tee(info, eval_block)?;
let tee = IoTee::new(read, tee_thread);
Ok(PipelineData::ByteStream( Ok(PipelineData::ByteStream(
ByteStream::read(tee, span, ctrlc), ByteStream::read(tee, span, ctrlc, type_),
metadata, metadata,
)) ))
} }
@ -127,44 +132,32 @@ use it in your pipeline."#
return stderr_misuse(span, head); return stderr_misuse(span, head);
} }
let tee = IoTee::new(file, span, eval_block)?; let tee_thread = spawn_tee(info, eval_block)?;
let tee = IoTee::new(file, tee_thread);
Ok(PipelineData::ByteStream( Ok(PipelineData::ByteStream(
ByteStream::read(tee, span, ctrlc), ByteStream::read(tee, span, ctrlc, type_),
metadata, metadata,
)) ))
} }
ByteStreamSource::Child(mut child) => { ByteStreamSource::Child(mut child) => {
let stderr_thread = if use_stderr { let stderr_thread = if use_stderr {
let stderr_thread = if let Some(stderr) = child.stderr.take() { let stderr_thread = if let Some(stderr) = child.stderr.take() {
let tee_thread = spawn_tee(info.clone(), eval_block)?;
let tee = IoTee::new(stderr, tee_thread);
match stack.stderr() { match stack.stderr() {
OutDest::Pipe | OutDest::Capture => { OutDest::Pipe | OutDest::Capture => {
let tee = IoTee::new(stderr, span, eval_block)?;
child.stderr = Some(ChildPipe::Tee(Box::new(tee))); child.stderr = Some(ChildPipe::Tee(Box::new(tee)));
None Ok(None)
} }
OutDest::Null => Some(tee_pipe_on_thread( OutDest::Null => copy_on_thread(tee, io::sink(), &info).map(Some),
stderr, OutDest::Inherit => {
io::sink(), copy_on_thread(tee, io::stderr(), &info).map(Some)
span, }
ctrlc.as_ref(), OutDest::File(file) => {
eval_block, copy_on_thread(tee, file.clone(), &info).map(Some)
)?), }
OutDest::Inherit => Some(tee_pipe_on_thread( }?
stderr,
io::stderr(),
span,
ctrlc.as_ref(),
eval_block,
)?),
OutDest::File(file) => Some(tee_pipe_on_thread(
stderr,
file.clone(),
span,
ctrlc.as_ref(),
eval_block,
)?),
}
} else { } else {
None None
}; };
@ -175,37 +168,29 @@ use it in your pipeline."#
child.stdout = Some(stdout); child.stdout = Some(stdout);
Ok(()) Ok(())
} }
OutDest::Null => { OutDest::Null => copy_pipe(stdout, io::sink(), &info),
copy_pipe(stdout, io::sink(), span, ctrlc.as_deref()) OutDest::Inherit => copy_pipe(stdout, io::stdout(), &info),
} OutDest::File(file) => copy_pipe(stdout, file.as_ref(), &info),
OutDest::Inherit => {
copy_pipe(stdout, io::stdout(), span, ctrlc.as_deref())
}
OutDest::File(file) => {
copy_pipe(stdout, file.as_ref(), span, ctrlc.as_deref())
}
}?; }?;
} }
stderr_thread stderr_thread
} else { } else {
let stderr_thread = if let Some(stderr) = child.stderr.take() { let stderr_thread = if let Some(stderr) = child.stderr.take() {
let info = info.clone();
match stack.stderr() { match stack.stderr() {
OutDest::Pipe | OutDest::Capture => { OutDest::Pipe | OutDest::Capture => {
child.stderr = Some(stderr); child.stderr = Some(stderr);
Ok(None) Ok(None)
} }
OutDest::Null => { OutDest::Null => {
copy_pipe_on_thread(stderr, io::sink(), span, ctrlc.as_ref()) copy_pipe_on_thread(stderr, io::sink(), &info).map(Some)
.map(Some)
} }
OutDest::Inherit => { OutDest::Inherit => {
copy_pipe_on_thread(stderr, io::stderr(), span, ctrlc.as_ref()) copy_pipe_on_thread(stderr, io::stderr(), &info).map(Some)
.map(Some)
} }
OutDest::File(file) => { OutDest::File(file) => {
copy_pipe_on_thread(stderr, file.clone(), span, ctrlc.as_ref()) copy_pipe_on_thread(stderr, file.clone(), &info).map(Some)
.map(Some)
} }
}? }?
} else { } else {
@ -213,29 +198,16 @@ use it in your pipeline."#
}; };
if let Some(stdout) = child.stdout.take() { if let Some(stdout) = child.stdout.take() {
let tee_thread = spawn_tee(info.clone(), eval_block)?;
let tee = IoTee::new(stdout, tee_thread);
match stack.stdout() { match stack.stdout() {
OutDest::Pipe | OutDest::Capture => { OutDest::Pipe | OutDest::Capture => {
let tee = IoTee::new(stdout, span, eval_block)?;
child.stdout = Some(ChildPipe::Tee(Box::new(tee))); child.stdout = Some(ChildPipe::Tee(Box::new(tee)));
Ok(()) Ok(())
} }
OutDest::Null => { OutDest::Null => copy(tee, io::sink(), &info),
tee_pipe(stdout, io::sink(), span, ctrlc.as_deref(), eval_block) OutDest::Inherit => copy(tee, io::stdout(), &info),
} OutDest::File(file) => copy(tee, file.as_ref(), &info),
OutDest::Inherit => tee_pipe(
stdout,
io::stdout(),
span,
ctrlc.as_deref(),
eval_block,
),
OutDest::File(file) => tee_pipe(
stdout,
file.as_ref(),
span,
ctrlc.as_deref(),
eval_block,
),
}?; }?;
} }
@ -350,7 +322,7 @@ where
fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> { fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
Err(ShellError::UnsupportedInput { Err(ShellError::UnsupportedInput {
msg: "--stderr can only be used on external commands".into(), msg: "--stderr can only be used on external commands".into(),
input: "the input to `tee` is not an external commands".into(), input: "the input to `tee` is not an external command".into(),
msg_span: head, msg_span: head,
input_span: span, input_span: span,
}) })
@ -363,23 +335,12 @@ struct IoTee<R: Read> {
} }
impl<R: Read> IoTee<R> { impl<R: Read> IoTee<R> {
fn new( fn new(reader: R, tee: TeeThread) -> Self {
reader: R, Self {
span: Span,
eval_block: impl FnOnce(ByteStream) -> Result<(), ShellError> + Send + 'static,
) -> Result<Self, ShellError> {
let (sender, receiver) = mpsc::channel();
let thread = thread::Builder::new()
.name("tee".into())
.spawn(move || eval_block(ByteStream::from_iter(receiver, span, None)))
.err_span(span)?;
Ok(Self {
reader, reader,
sender: Some(sender), sender: Some(tee.sender),
thread: Some(thread), thread: Some(tee.thread),
}) }
} }
} }
@ -411,68 +372,74 @@ impl<R: Read> Read for IoTee<R> {
} }
} }
fn tee_pipe( struct TeeThread {
pipe: ChildPipe, sender: Sender<Vec<u8>>,
mut dest: impl Write, thread: JoinHandle<Result<(), ShellError>>,
}
fn spawn_tee(
info: StreamInfo,
mut eval_block: impl FnMut(PipelineData) -> Result<(), ShellError> + Send + 'static,
) -> Result<TeeThread, ShellError> {
let (sender, receiver) = mpsc::channel();
let thread = thread::Builder::new()
.name("tee".into())
.spawn(move || {
// We don't use ctrlc here because we assume it already has it on the other side
let stream = ByteStream::from_iter(receiver.into_iter(), info.span, None, info.type_);
eval_block(PipelineData::ByteStream(stream, info.metadata))
})
.err_span(info.span)?;
Ok(TeeThread { sender, thread })
}
#[derive(Clone)]
struct StreamInfo {
span: Span, span: Span,
ctrlc: Option<&AtomicBool>, ctrlc: Option<Arc<AtomicBool>>,
eval_block: impl FnOnce(ByteStream) -> Result<(), ShellError> + Send + 'static, type_: ByteStreamType,
) -> Result<(), ShellError> { metadata: Option<PipelineMetadata>,
match pipe { }
ChildPipe::Pipe(pipe) => {
let mut tee = IoTee::new(pipe, span, eval_block)?; fn copy(mut src: impl Read, mut dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
copy_with_interrupt(&mut tee, &mut dest, span, ctrlc)?; copy_with_interrupt(&mut src, &mut dest, info.span, info.ctrlc.as_deref())?;
}
ChildPipe::Tee(tee) => {
let mut tee = IoTee::new(tee, span, eval_block)?;
copy_with_interrupt(&mut tee, &mut dest, span, ctrlc)?;
}
}
Ok(()) Ok(())
} }
fn tee_pipe_on_thread( fn copy_pipe(pipe: ChildPipe, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
pipe: ChildPipe, match pipe {
dest: impl Write + Send + 'static, ChildPipe::Pipe(pipe) => copy(pipe, dest, info),
span: Span, ChildPipe::Tee(tee) => copy(tee, dest, info),
ctrlc: Option<&Arc<AtomicBool>>, }
eval_block: impl FnOnce(ByteStream) -> Result<(), ShellError> + Send + 'static, }
fn copy_on_thread(
mut src: impl Read + Send + 'static,
mut dest: impl Write + Send + 'static,
info: &StreamInfo,
) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> { ) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
let ctrlc = ctrlc.cloned(); let span = info.span;
let ctrlc = info.ctrlc.clone();
thread::Builder::new() thread::Builder::new()
.name("stderr tee".into()) .name("stderr copier".into())
.spawn(move || tee_pipe(pipe, dest, span, ctrlc.as_deref(), eval_block)) .spawn(move || {
copy_with_interrupt(&mut src, &mut dest, span, ctrlc.as_deref())?;
Ok(())
})
.map_err(|e| e.into_spanned(span).into()) .map_err(|e| e.into_spanned(span).into())
} }
fn copy_pipe(
pipe: ChildPipe,
mut dest: impl Write,
span: Span,
ctrlc: Option<&AtomicBool>,
) -> Result<(), ShellError> {
match pipe {
ChildPipe::Pipe(mut pipe) => {
copy_with_interrupt(&mut pipe, &mut dest, span, ctrlc)?;
}
ChildPipe::Tee(mut tee) => {
copy_with_interrupt(&mut tee, &mut dest, span, ctrlc)?;
}
}
Ok(())
}
fn copy_pipe_on_thread( fn copy_pipe_on_thread(
pipe: ChildPipe, pipe: ChildPipe,
dest: impl Write + Send + 'static, dest: impl Write + Send + 'static,
span: Span, info: &StreamInfo,
ctrlc: Option<&Arc<AtomicBool>>,
) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> { ) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
let ctrlc = ctrlc.cloned(); match pipe {
thread::Builder::new() ChildPipe::Pipe(pipe) => copy_on_thread(pipe, dest, info),
.name("stderr copier".into()) ChildPipe::Tee(tee) => copy_on_thread(tee, dest, info),
.spawn(move || copy_pipe(pipe, dest, span, ctrlc.as_deref())) }
.map_err(|e| e.into_spanned(span).into())
} }
#[test] #[test]

View File

@ -225,8 +225,8 @@ fn update(
type_name: "empty pipeline".to_string(), type_name: "empty pipeline".to_string(),
span: head, span: head,
}), }),
PipelineData::ByteStream(..) => Err(ShellError::IncompatiblePathAccess { PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
type_name: "byte stream".to_string(), type_name: stream.type_().describe().into(),
span: head, span: head,
}), }),
} }

View File

@ -285,8 +285,8 @@ fn upsert(
type_name: "empty pipeline".to_string(), type_name: "empty pipeline".to_string(),
span: head, span: head,
}), }),
PipelineData::ByteStream(..) => Err(ShellError::IncompatiblePathAccess { PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
type_name: "byte stream".to_string(), type_name: stream.type_().describe().into(),
span: head, span: head,
}), }),
} }

View File

@ -182,7 +182,7 @@ fn values(
} }
PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record or table".into(), exp_input_type: "record or table".into(),
wrong_type: "byte stream".into(), wrong_type: stream.type_().describe().into(),
dst_span: head, dst_span: head,
src_span: stream.span(), src_span: stream.span(),
}), }),

View File

@ -51,7 +51,12 @@ impl Command for ToText {
str str
}); });
Ok(PipelineData::ByteStream( Ok(PipelineData::ByteStream(
ByteStream::from_iter(iter, span, engine_state.ctrlc.clone()), ByteStream::from_iter(
iter,
span,
engine_state.ctrlc.clone(),
ByteStreamType::String,
),
meta, meta,
)) ))
} }

View File

@ -117,10 +117,20 @@ pub fn response_to_buffer(
_ => None, _ => None,
}; };
// Try to guess whether the response is definitely intended to binary or definitely intended to
// be UTF-8 text. Otherwise specify `None` and just guess. This doesn't have to be thorough.
let content_type_lowercase = response.header("content-type").map(|s| s.to_lowercase());
let response_type = match content_type_lowercase.as_deref() {
Some("application/octet-stream") => ByteStreamType::Binary,
Some(h) if h.contains("charset=utf-8") => ByteStreamType::String,
_ => ByteStreamType::Unknown,
};
let reader = response.into_reader(); let reader = response.into_reader();
PipelineData::ByteStream( PipelineData::ByteStream(
ByteStream::read(reader, span, engine_state.ctrlc.clone()).with_known_size(buffer_size), ByteStream::read(reader, span, engine_state.ctrlc.clone(), response_type)
.with_known_size(buffer_size),
None, None,
) )
} }

View File

@ -1,4 +1,5 @@
use nu_engine::command_prelude::*; use nu_engine::command_prelude::*;
use std::io::Write;
#[derive(Clone)] #[derive(Clone)]
pub struct StrJoin; pub struct StrJoin;
@ -40,31 +41,40 @@ impl Command for StrJoin {
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let separator: Option<String> = call.opt(engine_state, stack, 0)?; let separator: Option<String> = call.opt(engine_state, stack, 0)?;
let config = engine_state.get_config(); let config = engine_state.config.clone();
// let output = input.collect_string(&separator.unwrap_or_default(), &config)?; let span = call.head;
// Hmm, not sure what we actually want.
// `to_formatted_string` formats dates as human readable which feels funny.
let mut strings: Vec<String> = vec![];
for value in input { let metadata = input.metadata();
let str = match value { let mut iter = input.into_iter();
Value::Error { error, .. } => { let mut first = true;
return Err(*error);
let output = ByteStream::from_fn(span, None, ByteStreamType::String, move |buffer| {
// Write each input to the buffer
if let Some(value) = iter.next() {
// Write the separator if this is not the first
if first {
first = false;
} else if let Some(separator) = &separator {
write!(buffer, "{}", separator)?;
} }
Value::Date { val, .. } => format!("{val:?}"),
value => value.to_expanded_string("\n", config),
};
strings.push(str);
}
let output = if let Some(separator) = separator { match value {
strings.join(&separator) Value::Error { error, .. } => {
} else { return Err(*error);
strings.join("") }
}; // Hmm, not sure what we actually want.
// `to_expanded_string` formats dates as human readable which feels funny.
Value::Date { val, .. } => write!(buffer, "{val:?}")?,
value => write!(buffer, "{}", value.to_expanded_string("\n", &config))?,
}
Ok(true)
} else {
Ok(false)
}
});
Ok(Value::string(output, call.head).into_pipeline_data()) Ok(PipelineData::ByteStream(output, metadata))
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {

View File

@ -416,6 +416,7 @@ impl ExternalCommand {
.name("external stdin worker".to_string()) .name("external stdin worker".to_string())
.spawn(move || { .spawn(move || {
let input = match input { let input = match input {
// Don't touch binary input or byte streams
input @ PipelineData::ByteStream(..) => input, input @ PipelineData::ByteStream(..) => input,
input @ PipelineData::Value(Value::Binary { .. }, ..) => input, input @ PipelineData::Value(Value::Binary { .. }, ..) => input,
input => { input => {

View File

@ -5,6 +5,7 @@
use lscolors::{LsColors, Style}; use lscolors::{LsColors, Style};
use nu_color_config::{color_from_hex, StyleComputer, TextStyle}; use nu_color_config::{color_from_hex, StyleComputer, TextStyle};
use nu_engine::{command_prelude::*, env::get_config, env_to_string}; use nu_engine::{command_prelude::*, env::get_config, env_to_string};
use nu_pretty_hex::HexConfig;
use nu_protocol::{ use nu_protocol::{
ByteStream, Config, DataSource, ListStream, PipelineMetadata, TableMode, ValueIterator, ByteStream, Config, DataSource, ListStream, PipelineMetadata, TableMode, ValueIterator,
}; };
@ -15,7 +16,7 @@ use nu_table::{
use nu_utils::get_ls_colors; use nu_utils::get_ls_colors;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
io::{Cursor, IsTerminal}, io::{IsTerminal, Read},
path::PathBuf, path::PathBuf,
str::FromStr, str::FromStr,
sync::{atomic::AtomicBool, Arc}, sync::{atomic::AtomicBool, Arc},
@ -364,16 +365,18 @@ fn handle_table_command(
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let span = input.data.span().unwrap_or(input.call.head); let span = input.data.span().unwrap_or(input.call.head);
match input.data { match input.data {
// Binary streams should behave as if they really are `binary` data, and printed as hex
PipelineData::ByteStream(stream, _) if stream.type_() == ByteStreamType::Binary => Ok(
PipelineData::ByteStream(pretty_hex_stream(stream, input.call.head), None),
),
PipelineData::ByteStream(..) => Ok(input.data), PipelineData::ByteStream(..) => Ok(input.data),
PipelineData::Value(Value::Binary { val, .. }, ..) => { PipelineData::Value(Value::Binary { val, .. }, ..) => {
let bytes = {
let mut str = nu_pretty_hex::pretty_hex(&val);
str.push('\n');
str.into_bytes()
};
let ctrlc = input.engine_state.ctrlc.clone(); let ctrlc = input.engine_state.ctrlc.clone();
let stream = ByteStream::read(Cursor::new(bytes), input.call.head, ctrlc); let stream = ByteStream::read_binary(val, input.call.head, ctrlc);
Ok(PipelineData::ByteStream(stream, None)) Ok(PipelineData::ByteStream(
pretty_hex_stream(stream, input.call.head),
None,
))
} }
// None of these two receive a StyleComputer because handle_row_stream() can produce it by itself using engine_state and stack. // None of these two receive a StyleComputer because handle_row_stream() can produce it by itself using engine_state and stack.
PipelineData::Value(Value::List { vals, .. }, metadata) => { PipelineData::Value(Value::List { vals, .. }, metadata) => {
@ -410,6 +413,70 @@ fn handle_table_command(
} }
} }
fn pretty_hex_stream(stream: ByteStream, span: Span) -> ByteStream {
let mut cfg = HexConfig {
// We are going to render the title manually first
title: true,
// If building on 32-bit, the stream size might be bigger than a usize
length: stream.known_size().and_then(|sz| sz.try_into().ok()),
..HexConfig::default()
};
// This won't really work for us
debug_assert!(cfg.width > 0, "the default hex config width was zero");
let mut read_buf = Vec::with_capacity(cfg.width);
let mut reader = if let Some(reader) = stream.reader() {
reader
} else {
// No stream to read from
return ByteStream::read_string("".into(), span, None);
};
ByteStream::from_fn(span, None, ByteStreamType::String, move |buffer| {
// Turn the buffer into a String we can write to
let mut write_buf = std::mem::take(buffer);
write_buf.clear();
// SAFETY: we just truncated it empty
let mut write_buf = unsafe { String::from_utf8_unchecked(write_buf) };
// Write the title at the beginning
if cfg.title {
nu_pretty_hex::write_title(&mut write_buf, cfg, true).expect("format error");
cfg.title = false;
// Put the write_buf back into buffer
*buffer = write_buf.into_bytes();
Ok(true)
} else {
// Read up to `cfg.width` bytes
read_buf.clear();
(&mut reader)
.take(cfg.width as u64)
.read_to_end(&mut read_buf)
.err_span(span)?;
if !read_buf.is_empty() {
nu_pretty_hex::hex_write(&mut write_buf, &read_buf, cfg, Some(true))
.expect("format error");
write_buf.push('\n');
// Advance the address offset for next time
cfg.address_offset += read_buf.len();
// Put the write_buf back into buffer
*buffer = write_buf.into_bytes();
Ok(true)
} else {
Ok(false)
}
}
})
}
fn handle_record( fn handle_record(
input: CmdInput, input: CmdInput,
cfg: TableConfig, cfg: TableConfig,
@ -608,7 +675,8 @@ fn handle_row_stream(
ctrlc.clone(), ctrlc.clone(),
cfg, cfg,
); );
let stream = ByteStream::from_result_iter(paginator, input.call.head, None); let stream =
ByteStream::from_result_iter(paginator, input.call.head, None, ByteStreamType::String);
Ok(PipelineData::ByteStream(stream, None)) Ok(PipelineData::ByteStream(stream, None))
} }

View File

@ -0,0 +1,27 @@
use nu_test_support::{nu, pipeline};
#[test]
fn test_stream() {
let actual = nu!(pipeline(
"
[0x[01] 0x[02] 0x[03] 0x[04]]
| filter {true}
| bytes collect 0x[aa aa]
| encode hex
"
));
assert_eq!(actual.out, "01AAAA02AAAA03AAAA04");
}
#[test]
fn test_stream_type() {
let actual = nu!(pipeline(
"
[0x[01] 0x[02] 0x[03] 0x[04]]
| filter {true}
| bytes collect 0x[00]
| describe -n
"
));
assert_eq!(actual.out, "binary (stream)");
}

View File

@ -0,0 +1 @@
mod collect;

View File

@ -68,6 +68,20 @@ fn gets_first_byte() {
assert_eq!(actual.out, "170"); assert_eq!(actual.out, "170");
} }
#[test]
fn gets_first_bytes_from_stream() {
let actual = nu!("(1.. | each { 0x[aa bb cc] } | bytes collect | first 2) == 0x[aa bb]");
assert_eq!(actual.out, "true");
}
#[test]
fn gets_first_byte_from_stream() {
let actual = nu!("1.. | each { 0x[aa bb cc] } | bytes collect | first");
assert_eq!(actual.out, "170");
}
#[test] #[test]
// covers a situation where `first` used to behave strangely on list<binary> input // covers a situation where `first` used to behave strangely on list<binary> input
fn works_with_binary_list() { fn works_with_binary_list() {

View File

@ -68,6 +68,20 @@ fn gets_last_byte() {
assert_eq!(actual.out, "204"); assert_eq!(actual.out, "204");
} }
#[test]
fn gets_last_bytes_from_stream() {
let actual = nu!("(1..10 | each { 0x[aa bb cc] } | bytes collect | last 2) == 0x[bb cc]");
assert_eq!(actual.out, "true");
}
#[test]
fn gets_last_byte_from_stream() {
let actual = nu!("1..10 | each { 0x[aa bb cc] } | bytes collect | last");
assert_eq!(actual.out, "204");
}
#[test] #[test]
fn last_errors_on_negative_index() { fn last_errors_on_negative_index() {
let actual = nu!("[1, 2, 3] | last -2"); let actual = nu!("[1, 2, 3] | last -2");

View File

@ -4,6 +4,7 @@ mod any;
mod append; mod append;
mod assignment; mod assignment;
mod break_; mod break_;
mod bytes;
mod cal; mod cal;
mod cd; mod cd;
mod compact; mod compact;

View File

@ -1,13 +1,17 @@
use nu_test_support::nu; use nu_test_support::nu;
#[test] #[test]
fn binary_skip_will_raise_error() { fn skips_bytes() {
let actual = nu!( let actual = nu!("(0x[aa bb cc] | skip 2) == 0x[cc]");
cwd: "tests/fixtures/formats",
"open sample_data.ods --raw | skip 2"
);
assert!(actual.err.contains("only_supports_this_input_type")); assert_eq!(actual.out, "true");
}
#[test]
fn skips_bytes_from_stream() {
let actual = nu!("([0 1] | each { 0x[aa bb cc] } | bytes collect | skip 2) == 0x[cc aa bb cc]");
assert_eq!(actual.out, "true");
} }
#[test] #[test]

View File

@ -22,6 +22,18 @@ fn test_2() {
assert_eq!(actual.out, "a<sep>b<sep>c<sep>d"); assert_eq!(actual.out, "a<sep>b<sep>c<sep>d");
} }
#[test]
fn test_stream() {
let actual = nu!("[a b c d] | filter {true} | str join .");
assert_eq!(actual.out, "a.b.c.d");
}
#[test]
fn test_stream_type() {
let actual = nu!("[a b c d] | filter {true} | str join . | describe -n");
assert_eq!(actual.out, "string (stream)");
}
#[test] #[test]
fn construct_a_path() { fn construct_a_path() {
let actual = nu!(pipeline( let actual = nu!(pipeline(

View File

@ -1,5 +1,5 @@
mod collect;
mod into_string; mod into_string;
mod join;
use nu_test_support::fs::Stub::FileWithContent; use nu_test_support::fs::Stub::FileWithContent;
use nu_test_support::playground::Playground; use nu_test_support::playground::Playground;

View File

@ -35,6 +35,20 @@ fn fails_on_string() {
assert!(actual.err.contains("command doesn't support")); assert!(actual.err.contains("command doesn't support"));
} }
#[test]
fn takes_bytes() {
let actual = nu!("(0x[aa bb cc] | take 2) == 0x[aa bb]");
assert_eq!(actual.out, "true");
}
#[test]
fn takes_bytes_from_stream() {
let actual = nu!("(1.. | each { 0x[aa bb cc] } | bytes collect | take 2) == 0x[aa bb]");
assert_eq!(actual.out, "true");
}
#[test] #[test]
// covers a situation where `take` used to behave strangely on list<binary> input // covers a situation where `take` used to behave strangely on list<binary> input
fn works_with_binary_list() { fn works_with_binary_list() {

View File

@ -2,7 +2,7 @@ pub use crate::CallExt;
pub use nu_protocol::{ pub use nu_protocol::{
ast::{Call, CellPath}, ast::{Call, CellPath},
engine::{Command, EngineState, Stack}, engine::{Command, EngineState, Stack},
record, Category, ErrSpan, Example, IntoInterruptiblePipelineData, IntoPipelineData, record, ByteStream, ByteStreamType, Category, ErrSpan, Example, IntoInterruptiblePipelineData,
IntoSpanned, PipelineData, Record, ShellError, Signature, Span, Spanned, SyntaxShape, Type, IntoPipelineData, IntoSpanned, PipelineData, Record, ShellError, Signature, Span, Spanned,
Value, SyntaxShape, Type, Value,
}; };

View File

@ -183,7 +183,7 @@ pub trait InterfaceManager {
PipelineDataHeader::ByteStream(info) => { PipelineDataHeader::ByteStream(info) => {
let handle = self.stream_manager().get_handle(); let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?; let reader = handle.read_stream(info.id, self.get_interface())?;
ByteStream::from_result_iter(reader, info.span, ctrlc.cloned()).into() ByteStream::from_result_iter(reader, info.span, ctrlc.cloned(), info.type_).into()
} }
}) })
} }
@ -261,9 +261,10 @@ pub trait Interface: Clone + Send {
} }
PipelineData::ByteStream(stream, ..) => { PipelineData::ByteStream(stream, ..) => {
let span = stream.span(); let span = stream.span();
let type_ = stream.type_();
if let Some(reader) = stream.reader() { if let Some(reader) = stream.reader() {
let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?; let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id, span }); let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id, span, type_ });
Ok((header, PipelineDataWriter::ByteStream(writer, reader))) Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
} else { } else {
Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)) Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))

View File

@ -10,8 +10,8 @@ use nu_plugin_protocol::{
StreamMessage, StreamMessage,
}; };
use nu_protocol::{ use nu_protocol::{
ByteStream, ByteStreamSource, DataSource, ListStream, PipelineData, PipelineMetadata, ByteStream, ByteStreamSource, ByteStreamType, DataSource, ListStream, PipelineData,
ShellError, Span, Value, PipelineMetadata, ShellError, Span, Value,
}; };
use std::{path::Path, sync::Arc}; use std::{path::Path, sync::Arc};
@ -208,6 +208,7 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> {
let header = PipelineDataHeader::ByteStream(ByteStreamInfo { let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
id: 12, id: 12,
span: test_span, span: test_span,
type_: ByteStreamType::Unknown,
}); });
let pipe = manager.read_pipeline_data(header, None)?; let pipe = manager.read_pipeline_data(header, None)?;
@ -401,7 +402,12 @@ fn write_pipeline_data_byte_stream() -> Result<(), ShellError> {
// Set up pipeline data for a byte stream // Set up pipeline data for a byte stream
let data = PipelineData::ByteStream( let data = PipelineData::ByteStream(
ByteStream::read(std::io::Cursor::new(expected), span, None), ByteStream::read(
std::io::Cursor::new(expected),
span,
None,
ByteStreamType::Unknown,
),
None, None,
); );

View File

@ -17,8 +17,8 @@ use nu_plugin_protocol::{
use nu_protocol::{ use nu_protocol::{
ast::{Math, Operator}, ast::{Math, Operator},
engine::Closure, engine::Closure,
CustomValue, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, PluginSignature, ByteStreamType, CustomValue, IntoInterruptiblePipelineData, IntoSpanned, PipelineData,
ShellError, Span, Spanned, Value, PluginSignature, ShellError, Span, Spanned, Value,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
@ -157,6 +157,7 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
PipelineDataHeader::ByteStream(ByteStreamInfo { PipelineDataHeader::ByteStream(ByteStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
type_: ByteStreamType::Unknown,
}), }),
None, None,
)?; )?;
@ -384,6 +385,7 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> {
PluginCallResponse::PipelineData(PipelineDataHeader::ByteStream(ByteStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::ByteStream(ByteStreamInfo {
id: 1, id: 1,
span: Span::test_data(), span: Span::test_data(),
type_: ByteStreamType::Unknown,
})), })),
))?; ))?;

View File

@ -22,8 +22,8 @@ mod tests;
pub mod test_util; pub mod test_util;
use nu_protocol::{ use nu_protocol::{
ast::Operator, engine::Closure, Config, LabeledError, PipelineData, PluginSignature, ast::Operator, engine::Closure, ByteStreamType, Config, LabeledError, PipelineData,
ShellError, Span, Spanned, Value, PluginSignature, ShellError, Span, Spanned, Value,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
@ -112,6 +112,8 @@ pub struct ListStreamInfo {
pub struct ByteStreamInfo { pub struct ByteStreamInfo {
pub id: StreamId, pub id: StreamId,
pub span: Span, pub span: Span,
#[serde(rename = "type")]
pub type_: ByteStreamType,
} }
/// Calls that a plugin can execute. The type parameter determines the input type. /// Calls that a plugin can execute. The type parameter determines the input type.

View File

@ -9,8 +9,8 @@ use nu_plugin_protocol::{
PluginCustomValue, PluginInput, PluginOutput, Protocol, ProtocolInfo, StreamData, PluginCustomValue, PluginInput, PluginOutput, Protocol, ProtocolInfo, StreamData,
}; };
use nu_protocol::{ use nu_protocol::{
engine::Closure, Config, CustomValue, IntoInterruptiblePipelineData, LabeledError, engine::Closure, ByteStreamType, Config, CustomValue, IntoInterruptiblePipelineData,
PipelineData, PluginSignature, ShellError, Span, Spanned, Value, LabeledError, PipelineData, PluginSignature, ShellError, Span, Spanned, Value,
}; };
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -160,6 +160,7 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
PipelineDataHeader::ByteStream(ByteStreamInfo { PipelineDataHeader::ByteStream(ByteStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
type_: ByteStreamType::Unknown,
}), }),
None, None,
)?; )?;

View File

@ -174,20 +174,14 @@ where
.collect(); .collect();
if cfg.title { if cfg.title {
if use_color { write_title(
writeln!( writer,
writer, HexConfig {
"Length: {0} (0x{0:x}) bytes | {1}printable {2}whitespace {3}ascii_other {4}non_ascii{5}", length: Some(source_part_vec.len()),
source_part_vec.len(), ..cfg
Style::default().fg(Color::Cyan).bold().prefix(), },
Style::default().fg(Color::Green).bold().prefix(), use_color,
Style::default().fg(Color::Purple).bold().prefix(), )?;
Style::default().fg(Color::Yellow).bold().prefix(),
Style::default().fg(Color::Yellow).suffix()
)?;
} else {
writeln!(writer, "Length: {0} (0x{0:x}) bytes", source_part_vec.len(),)?;
}
} }
let lines = source_part_vec.chunks(if cfg.width > 0 { let lines = source_part_vec.chunks(if cfg.width > 0 {
@ -256,6 +250,34 @@ where
Ok(()) Ok(())
} }
/// Write the title for the given config. The length will be taken from `cfg.length`.
pub fn write_title<W>(writer: &mut W, cfg: HexConfig, use_color: bool) -> Result<(), fmt::Error>
where
W: fmt::Write,
{
let write = |writer: &mut W, length: fmt::Arguments<'_>| {
if use_color {
writeln!(
writer,
"Length: {length} | {0}printable {1}whitespace {2}ascii_other {3}non_ascii{4}",
Style::default().fg(Color::Cyan).bold().prefix(),
Style::default().fg(Color::Green).bold().prefix(),
Style::default().fg(Color::Purple).bold().prefix(),
Style::default().fg(Color::Yellow).bold().prefix(),
Style::default().fg(Color::Yellow).suffix()
)
} else {
writeln!(writer, "Length: {length}")
}
};
if let Some(len) = cfg.length {
write(writer, format_args!("{len} (0x{len:x}) bytes"))
} else {
write(writer, format_args!("unknown (stream)"))
}
}
/// Reference wrapper for use in arguments formatting. /// Reference wrapper for use in arguments formatting.
pub struct Hex<'a, T: 'a>(&'a T, HexConfig); pub struct Hex<'a, T: 'a>(&'a T, HexConfig);

View File

@ -1017,7 +1017,10 @@ pub enum ShellError {
/// ///
/// Check your input's encoding. Are there any funny characters/bytes? /// Check your input's encoding. Are there any funny characters/bytes?
#[error("Non-UTF8 string")] #[error("Non-UTF8 string")]
#[diagnostic(code(nu::parser::non_utf8))] #[diagnostic(
code(nu::parser::non_utf8),
help("see `decode` for handling character sets other than UTF-8")
)]
NonUtf8 { NonUtf8 {
#[label("non-UTF8 string")] #[label("non-UTF8 string")]
span: Span, span: Span,
@ -1029,7 +1032,10 @@ pub enum ShellError {
/// ///
/// Check your input's encoding. Are there any funny characters/bytes? /// Check your input's encoding. Are there any funny characters/bytes?
#[error("Non-UTF8 string")] #[error("Non-UTF8 string")]
#[diagnostic(code(nu::parser::non_utf8_custom))] #[diagnostic(
code(nu::parser::non_utf8_custom),
help("see `decode` for handling character sets other than UTF-8")
)]
NonUtf8Custom { NonUtf8Custom {
msg: String, msg: String,
#[label("{msg}")] #[label("{msg}")]

View File

@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use crate::{ use crate::{
process::{ChildPipe, ChildProcess, ExitStatus}, process::{ChildPipe, ChildProcess, ExitStatus},
ErrSpan, IntoSpanned, OutDest, PipelineData, ShellError, Span, Value, ErrSpan, IntoSpanned, OutDest, PipelineData, ShellError, Span, Type, Value,
}; };
#[cfg(unix)] #[cfg(unix)]
use std::os::fd::OwnedFd; use std::os::fd::OwnedFd;
@ -41,6 +43,24 @@ impl ByteStreamSource {
}), }),
} }
} }
/// Source is a `Child` or `File`, rather than `Read`. Currently affects trimming
fn is_external(&self) -> bool {
matches!(
self,
ByteStreamSource::File(..) | ByteStreamSource::Child(..)
)
}
}
impl Debug for ByteStreamSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
}
}
} }
enum SourceReader { enum SourceReader {
@ -57,6 +77,55 @@ impl Read for SourceReader {
} }
} }
impl Debug for SourceReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
}
}
}
/// Optional type color for [`ByteStream`], which determines type compatibility.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum ByteStreamType {
/// Compatible with [`Type::Binary`], and should only be converted to binary, even when the
/// desired type is unknown.
Binary,
/// Compatible with [`Type::String`], and should only be converted to string, even when the
/// desired type is unknown.
///
/// This does not guarantee valid UTF-8 data, but it is conventionally so. Converting to
/// `String` still requires validation of the data.
String,
/// Unknown whether the stream should contain binary or string data. This usually is the result
/// of an external stream, e.g. an external command or file.
#[default]
Unknown,
}
impl ByteStreamType {
/// Returns the string that describes the byte stream type - i.e., the same as what `describe`
/// produces. This can be used in type mismatch error messages.
pub fn describe(self) -> &'static str {
match self {
ByteStreamType::Binary => "binary (stream)",
ByteStreamType::String => "string (stream)",
ByteStreamType::Unknown => "byte stream",
}
}
}
impl From<ByteStreamType> for Type {
fn from(value: ByteStreamType) -> Self {
match value {
ByteStreamType::Binary => Type::Binary,
ByteStreamType::String => Type::String,
ByteStreamType::Unknown => Type::Any,
}
}
}
/// A potentially infinite, interruptible stream of bytes. /// A potentially infinite, interruptible stream of bytes.
/// ///
/// To create a [`ByteStream`], you can use any of the following methods: /// To create a [`ByteStream`], you can use any of the following methods:
@ -65,20 +134,31 @@ impl Read for SourceReader {
/// - [`from_iter`](ByteStream::from_iter): takes an [`Iterator`] whose items implement `AsRef<[u8]>`. /// - [`from_iter`](ByteStream::from_iter): takes an [`Iterator`] whose items implement `AsRef<[u8]>`.
/// - [`from_result_iter`](ByteStream::from_result_iter): same as [`from_iter`](ByteStream::from_iter), /// - [`from_result_iter`](ByteStream::from_result_iter): same as [`from_iter`](ByteStream::from_iter),
/// but each item is a `Result<T, ShellError>`. /// but each item is a `Result<T, ShellError>`.
/// - [`from_fn`](ByteStream::from_fn): uses a generator function to fill a buffer whenever it is
/// empty. This has high performance because it doesn't need to allocate for each chunk of data,
/// and can just reuse the same buffer.
///
/// Byte streams have a [type](.type_()) which is used to preserve type compatibility when they
/// are the result of an internal command. It is important that this be set to the correct value.
/// [`Unknown`](ByteStreamType::Unknown) is used only for external sources where the type can not
/// be inherently determined, and having it automatically act as a string or binary depending on
/// whether it parses as UTF-8 or not is desirable.
/// ///
/// The data of a [`ByteStream`] can be accessed using one of the following methods: /// The data of a [`ByteStream`] can be accessed using one of the following methods:
/// - [`reader`](ByteStream::reader): returns a [`Read`]-able type to get the raw bytes in the stream. /// - [`reader`](ByteStream::reader): returns a [`Read`]-able type to get the raw bytes in the stream.
/// - [`lines`](ByteStream::lines): splits the bytes on lines and returns an [`Iterator`] /// - [`lines`](ByteStream::lines): splits the bytes on lines and returns an [`Iterator`]
/// where each item is a `Result<String, ShellError>`. /// where each item is a `Result<String, ShellError>`.
/// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is either a string or binary. /// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is
/// either a string or binary.
/// Try not to use this method if possible. Rather, please use [`reader`](ByteStream::reader) /// Try not to use this method if possible. Rather, please use [`reader`](ByteStream::reader)
/// (or [`lines`](ByteStream::lines) if it matches the situation). /// (or [`lines`](ByteStream::lines) if it matches the situation).
/// ///
/// Additionally, there are few methods to collect a [`Bytestream`] into memory: /// Additionally, there are few methods to collect a [`Bytestream`] into memory:
/// - [`into_bytes`](ByteStream::into_bytes): collects all bytes into a [`Vec<u8>`]. /// - [`into_bytes`](ByteStream::into_bytes): collects all bytes into a [`Vec<u8>`].
/// - [`into_string`](ByteStream::into_string): collects all bytes into a [`String`], erroring if utf-8 decoding failed. /// - [`into_string`](ByteStream::into_string): collects all bytes into a [`String`], erroring if utf-8 decoding failed.
/// - [`into_value`](ByteStream::into_value): collects all bytes into a string [`Value`]. /// - [`into_value`](ByteStream::into_value): collects all bytes into a value typed appropriately
/// If utf-8 decoding failed, then a binary [`Value`] is returned instead. /// for the [type](.type_()) of this stream. If the type is [`Unknown`](ByteStreamType::Unknown),
/// it will produce a string value if the data is valid UTF-8, or a binary value otherwise.
/// ///
/// There are also a few other methods to consume all the data of a [`Bytestream`]: /// There are also a few other methods to consume all the data of a [`Bytestream`]:
/// - [`drain`](ByteStream::drain): consumes all bytes and outputs nothing. /// - [`drain`](ByteStream::drain): consumes all bytes and outputs nothing.
@ -88,54 +168,135 @@ impl Read for SourceReader {
/// ///
/// Internally, [`ByteStream`]s currently come in three flavors according to [`ByteStreamSource`]. /// Internally, [`ByteStream`]s currently come in three flavors according to [`ByteStreamSource`].
/// See its documentation for more information. /// See its documentation for more information.
#[derive(Debug)]
pub struct ByteStream { pub struct ByteStream {
stream: ByteStreamSource, stream: ByteStreamSource,
span: Span, span: Span,
ctrlc: Option<Arc<AtomicBool>>, ctrlc: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
known_size: Option<u64>, known_size: Option<u64>,
} }
impl ByteStream { impl ByteStream {
/// Create a new [`ByteStream`] from a [`ByteStreamSource`]. /// Create a new [`ByteStream`] from a [`ByteStreamSource`].
pub fn new(stream: ByteStreamSource, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self { pub fn new(
stream: ByteStreamSource,
span: Span,
interrupt: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
) -> Self {
Self { Self {
stream, stream,
span, span,
ctrlc: interrupt, ctrlc: interrupt,
type_,
known_size: None, known_size: None,
} }
} }
/// Create a new [`ByteStream`] from a [`ByteStreamSource::Read`]. /// Create a [`ByteStream`] from an arbitrary reader. The type must be provided.
pub fn read( pub fn read(
reader: impl Read + Send + 'static, reader: impl Read + Send + 'static,
span: Span, span: Span,
interrupt: Option<Arc<AtomicBool>>, interrupt: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
) -> Self { ) -> Self {
Self::new(ByteStreamSource::Read(Box::new(reader)), span, interrupt) Self::new(
ByteStreamSource::Read(Box::new(reader)),
span,
interrupt,
type_,
)
} }
/// Create a new [`ByteStream`] from a [`ByteStreamSource::File`]. /// Create a [`ByteStream`] from a string. The type of the stream is always `String`.
pub fn read_string(string: String, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self {
let len = string.len();
ByteStream::read(
Cursor::new(string.into_bytes()),
span,
interrupt,
ByteStreamType::String,
)
.with_known_size(Some(len as u64))
}
/// Create a [`ByteStream`] from a byte vector. The type of the stream is always `Binary`.
pub fn read_binary(bytes: Vec<u8>, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self {
let len = bytes.len();
ByteStream::read(Cursor::new(bytes), span, interrupt, ByteStreamType::Binary)
.with_known_size(Some(len as u64))
}
/// Create a [`ByteStream`] from a file.
///
/// The type is implicitly `Unknown`, as it's not typically known whether files will
/// return text or binary.
pub fn file(file: File, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self { pub fn file(file: File, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self {
Self::new(ByteStreamSource::File(file), span, interrupt) Self::new(
ByteStreamSource::File(file),
span,
interrupt,
ByteStreamType::Unknown,
)
} }
/// Create a new [`ByteStream`] from a [`ByteStreamSource::Child`]. /// Create a [`ByteStream`] from a child process's stdout and stderr.
///
/// The type is implicitly `Unknown`, as it's not typically known whether child processes will
/// return text or binary.
pub fn child(child: ChildProcess, span: Span) -> Self { pub fn child(child: ChildProcess, span: Span) -> Self {
Self::new(ByteStreamSource::Child(Box::new(child)), span, None) Self::new(
ByteStreamSource::Child(Box::new(child)),
span,
None,
ByteStreamType::Unknown,
)
} }
/// Create a new [`ByteStream`] that reads from stdin. /// Create a [`ByteStream`] that reads from stdin.
///
/// The type is implicitly `Unknown`, as it's not typically known whether stdin is text or
/// binary.
pub fn stdin(span: Span) -> Result<Self, ShellError> { pub fn stdin(span: Span) -> Result<Self, ShellError> {
let stdin = os_pipe::dup_stdin().err_span(span)?; let stdin = os_pipe::dup_stdin().err_span(span)?;
let source = ByteStreamSource::File(convert_file(stdin)); let source = ByteStreamSource::File(convert_file(stdin));
Ok(Self::new(source, span, None)) Ok(Self::new(source, span, None, ByteStreamType::Unknown))
}
/// Create a [`ByteStream`] from a generator function that writes data to the given buffer
/// when called, and returns `Ok(false)` on end of stream.
pub fn from_fn(
span: Span,
interrupt: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
) -> Self {
Self::read(
ReadGenerator {
buffer: Cursor::new(Vec::new()),
generator,
},
span,
interrupt,
type_,
)
}
pub fn with_type(mut self, type_: ByteStreamType) -> Self {
self.type_ = type_;
self
} }
/// Create a new [`ByteStream`] from an [`Iterator`] of bytes slices. /// Create a new [`ByteStream`] from an [`Iterator`] of bytes slices.
/// ///
/// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`. /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
pub fn from_iter<I>(iter: I, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self pub fn from_iter<I>(
iter: I,
span: Span,
interrupt: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
) -> Self
where where
I: IntoIterator, I: IntoIterator,
I::IntoIter: Send + 'static, I::IntoIter: Send + 'static,
@ -143,13 +304,18 @@ impl ByteStream {
{ {
let iter = iter.into_iter(); let iter = iter.into_iter();
let cursor = Some(Cursor::new(I::Item::default())); let cursor = Some(Cursor::new(I::Item::default()));
Self::read(ReadIterator { iter, cursor }, span, interrupt) Self::read(ReadIterator { iter, cursor }, span, interrupt, type_)
} }
/// Create a new [`ByteStream`] from an [`Iterator`] of [`Result`] bytes slices. /// Create a new [`ByteStream`] from an [`Iterator`] of [`Result`] bytes slices.
/// ///
/// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`. /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
pub fn from_result_iter<I, T>(iter: I, span: Span, interrupt: Option<Arc<AtomicBool>>) -> Self pub fn from_result_iter<I, T>(
iter: I,
span: Span,
interrupt: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
) -> Self
where where
I: IntoIterator<Item = Result<T, ShellError>>, I: IntoIterator<Item = Result<T, ShellError>>,
I::IntoIter: Send + 'static, I::IntoIter: Send + 'static,
@ -157,7 +323,7 @@ impl ByteStream {
{ {
let iter = iter.into_iter(); let iter = iter.into_iter();
let cursor = Some(Cursor::new(T::default())); let cursor = Some(Cursor::new(T::default()));
Self::read(ReadResultIterator { iter, cursor }, span, interrupt) Self::read(ReadResultIterator { iter, cursor }, span, interrupt, type_)
} }
/// Set the known size, in number of bytes, of the [`ByteStream`]. /// Set the known size, in number of bytes, of the [`ByteStream`].
@ -181,6 +347,11 @@ impl ByteStream {
self.span self.span
} }
/// Returns the [`ByteStreamType`] associated with the [`ByteStream`].
pub fn type_(&self) -> ByteStreamType {
self.type_
}
/// Returns the known size, in number of bytes, of the [`ByteStream`]. /// Returns the known size, in number of bytes, of the [`ByteStream`].
pub fn known_size(&self) -> Option<u64> { pub fn known_size(&self) -> Option<u64> {
self.known_size self.known_size
@ -220,8 +391,10 @@ impl ByteStream {
/// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result<Value, ShellError>`. /// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result<Value, ShellError>`.
/// ///
/// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source, /// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source,
/// up to a maximum size. If the chunk of bytes, or an expected portion of it, succeeds utf-8 decoding, /// up to a maximum size. The values are typed according to the [type](.type_()) of the
/// then it is returned as a [`Value::String`]. Otherwise, it is turned into a [`Value::Binary`]. /// stream, and if that type is [`Unknown`](ByteStreamType::Unknown), string values will be
/// produced as long as the stream continues to parse as valid UTF-8, but binary values will
/// be produced instead of the stream fails to parse as UTF-8 instead at any point.
/// Any and all newlines are kept intact in each chunk. /// Any and all newlines are kept intact in each chunk.
/// ///
/// Where possible, prefer [`reader`](ByteStream::reader) or [`lines`](ByteStream::lines) over this method. /// Where possible, prefer [`reader`](ByteStream::reader) or [`lines`](ByteStream::lines) over this method.
@ -232,12 +405,7 @@ impl ByteStream {
/// then the stream is considered empty and `None` will be returned. /// then the stream is considered empty and `None` will be returned.
pub fn chunks(self) -> Option<Chunks> { pub fn chunks(self) -> Option<Chunks> {
let reader = self.stream.reader()?; let reader = self.stream.reader()?;
Some(Chunks { Some(Chunks::new(reader, self.span, self.ctrlc, self.type_))
reader: BufReader::new(reader),
span: self.span,
ctrlc: self.ctrlc,
leftover: Vec::new(),
})
} }
/// Convert the [`ByteStream`] into its inner [`ByteStreamSource`]. /// Convert the [`ByteStream`] into its inner [`ByteStreamSource`].
@ -305,33 +473,64 @@ impl ByteStream {
} }
} }
/// Collect all the bytes of the [`ByteStream`] into a [`String`]. /// Collect the stream into a `String` in-memory. This can only succeed if the data contained is
/// valid UTF-8.
/// ///
/// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to being returned. /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to
/// being returned, if this is a stream coming from an external process or file.
/// ///
/// If utf-8 decoding fails, an error is returned. /// If the [type](.type_()) is specified as `Binary`, this operation always fails, even if the
/// data would have been valid UTF-8.
pub fn into_string(self) -> Result<String, ShellError> { pub fn into_string(self) -> Result<String, ShellError> {
let span = self.span; let span = self.span;
let bytes = self.into_bytes()?; if self.type_ != ByteStreamType::Binary {
let mut string = String::from_utf8(bytes).map_err(|_| ShellError::NonUtf8 { span })?; let trim = self.stream.is_external();
trim_end_newline(&mut string); let bytes = self.into_bytes()?;
Ok(string) let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
span,
msg: err.to_string(),
})?;
if trim {
trim_end_newline(&mut string);
}
Ok(string)
} else {
Err(ShellError::TypeMismatch {
err_message: "expected string, but got binary".into(),
span,
})
}
} }
/// Collect all the bytes of the [`ByteStream`] into a [`Value`]. /// Collect all the bytes of the [`ByteStream`] into a [`Value`].
/// ///
/// If the collected bytes are successfully decoded as utf-8, then a [`Value::String`] is returned. /// If this is a `String` stream, the stream is decoded to UTF-8. If the stream came from an
/// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to being returned. /// external process or file, the trailing new line (`\n` or `\r\n`), if any, is removed from
/// Otherwise, a [`Value::Binary`] is returned with any trailing new lines preserved. /// the [`String`] prior to being returned.
///
/// If this is a `Binary` stream, a [`Value::Binary`] is returned with any trailing new lines
/// preserved.
///
/// If this is an `Unknown` stream, the behavior depends on whether the stream parses as valid
/// UTF-8 or not. If it does, this is uses the `String` behavior; if not, it uses the `Binary`
/// behavior.
pub fn into_value(self) -> Result<Value, ShellError> { pub fn into_value(self) -> Result<Value, ShellError> {
let span = self.span; let span = self.span;
let bytes = self.into_bytes()?; let trim = self.stream.is_external();
let value = match String::from_utf8(bytes) { let value = match self.type_ {
Ok(mut str) => { // If the type is specified, then the stream should always become that type:
trim_end_newline(&mut str); ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
Value::string(str, span) ByteStreamType::String => Value::string(self.into_string()?, span),
} // If the type is not specified, then it just depends on whether it parses or not:
Err(err) => Value::binary(err.into_bytes(), span), ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
Ok(mut str) => {
if trim {
trim_end_newline(&mut str);
}
Value::string(str, span)
}
Err(err) => Value::binary(err.into_bytes(), span),
},
}; };
Ok(value) Ok(value)
} }
@ -477,12 +676,6 @@ impl ByteStream {
} }
} }
impl Debug for ByteStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ByteStream").finish()
}
}
impl From<ByteStream> for PipelineData { impl From<ByteStream> for PipelineData {
fn from(stream: ByteStream) -> Self { fn from(stream: ByteStream) -> Self {
Self::ByteStream(stream, None) Self::ByteStream(stream, None)
@ -613,54 +806,157 @@ impl Iterator for Lines {
} }
} }
/// Turn a readable stream into [`Value`]s.
///
/// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the
/// stream will return strings as long as UTF-8 parsing succeeds, but will start returning binary
/// if it fails.
pub struct Chunks { pub struct Chunks {
reader: BufReader<SourceReader>, reader: BufReader<SourceReader>,
pos: u64,
error: bool,
span: Span, span: Span,
ctrlc: Option<Arc<AtomicBool>>, ctrlc: Option<Arc<AtomicBool>>,
leftover: Vec<u8>, type_: ByteStreamType,
} }
impl Chunks { impl Chunks {
fn new(
reader: SourceReader,
span: Span,
ctrlc: Option<Arc<AtomicBool>>,
type_: ByteStreamType,
) -> Self {
Self {
reader: BufReader::new(reader),
pos: 0,
error: false,
span,
ctrlc,
type_,
}
}
pub fn span(&self) -> Span { pub fn span(&self) -> Span {
self.span self.span
} }
fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
// Get some data from the reader
let buf = self
.reader
.fill_buf()
.err_span(self.span)
.map_err(|err| (vec![], ShellError::from(err)))?;
// If empty, this is EOF
if buf.is_empty() {
return Ok(None);
}
let mut buf = buf.to_vec();
let mut consumed = 0;
// If the buf length is under 4 bytes, it could be invalid, so try to get more
if buf.len() < 4 {
consumed += buf.len();
self.reader.consume(buf.len());
match self.reader.fill_buf().err_span(self.span) {
Ok(more_bytes) => buf.extend_from_slice(more_bytes),
Err(err) => return Err((buf, err.into())),
}
}
// Try to parse utf-8 and decide what to do
match String::from_utf8(buf) {
Ok(string) => {
self.reader.consume(string.len() - consumed);
self.pos += string.len() as u64;
Ok(Some(string))
}
Err(err) if err.utf8_error().error_len().is_none() => {
// There is some valid data at the beginning, and this is just incomplete, so just
// consume that and return it
let valid_up_to = err.utf8_error().valid_up_to();
if valid_up_to > consumed {
self.reader.consume(valid_up_to - consumed);
}
let mut buf = err.into_bytes();
buf.truncate(valid_up_to);
buf.shrink_to_fit();
let string = String::from_utf8(buf)
.expect("failed to parse utf-8 even after correcting error");
self.pos += string.len() as u64;
Ok(Some(string))
}
Err(err) => {
// There is an error at the beginning and we have no hope of parsing further.
let shell_error = ShellError::NonUtf8Custom {
msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
span: self.span,
};
let buf = err.into_bytes();
// We are consuming the entire buf though, because we're returning it in case it
// will be cast to binary
if buf.len() > consumed {
self.reader.consume(buf.len() - consumed);
}
self.pos += buf.len() as u64;
Err((buf, shell_error))
}
}
}
} }
impl Iterator for Chunks { impl Iterator for Chunks {
type Item = Result<Value, ShellError>; type Item = Result<Value, ShellError>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if nu_utils::ctrl_c::was_pressed(&self.ctrlc) { if self.error || nu_utils::ctrl_c::was_pressed(&self.ctrlc) {
None None
} else { } else {
loop { match self.type_ {
match self.reader.fill_buf() { // Binary should always be binary
Ok(buf) => { ByteStreamType::Binary => {
self.leftover.extend_from_slice(buf); let buf = match self.reader.fill_buf().err_span(self.span) {
Ok(buf) => buf,
Err(err) => {
self.error = true;
return Some(Err(err.into()));
}
};
if !buf.is_empty() {
let len = buf.len(); let len = buf.len();
let value = Value::binary(buf, self.span);
self.reader.consume(len); self.reader.consume(len);
break; self.pos += len as u64;
} Some(Ok(value))
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Some(Err(err.into_spanned(self.span).into())),
};
}
if self.leftover.is_empty() {
return None;
}
match String::from_utf8(std::mem::take(&mut self.leftover)) {
Ok(str) => Some(Ok(Value::string(str, self.span))),
Err(err) => {
if err.utf8_error().error_len().is_some() {
Some(Ok(Value::binary(err.into_bytes(), self.span)))
} else { } else {
let i = err.utf8_error().valid_up_to(); None
let mut bytes = err.into_bytes(); }
self.leftover = bytes.split_off(i); }
let str = String::from_utf8(bytes).expect("valid utf8"); // String produces an error if UTF-8 can't be parsed
Some(Ok(Value::string(str, self.span))) ByteStreamType::String => match self.next_string().transpose()? {
Ok(string) => Some(Ok(Value::string(string, self.span))),
Err((_, err)) => {
self.error = true;
Some(Err(err))
}
},
// For Unknown, we try to create strings, but we switch to binary mode if we
// fail
ByteStreamType::Unknown => {
match self.next_string().transpose()? {
Ok(string) => Some(Ok(Value::string(string, self.span))),
Err((buf, _)) if !buf.is_empty() => {
// Switch to binary mode
self.type_ = ByteStreamType::Binary;
Some(Ok(Value::binary(buf, self.span)))
}
Err((_, err)) => {
self.error = true;
Some(Err(err))
}
} }
} }
} }
@ -776,11 +1072,58 @@ where
Ok(len as u64) Ok(len as u64)
} }
struct ReadGenerator<F>
where
F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
{
buffer: Cursor<Vec<u8>>,
generator: F,
}
impl<F> BufRead for ReadGenerator<F>
where
F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
{
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
// We have to loop, because it's important that we don't leave the buffer empty unless we're
// truly at the end of the stream.
while self.buffer.fill_buf()?.is_empty() {
// Reset the cursor to the beginning and truncate
self.buffer.set_position(0);
self.buffer.get_mut().clear();
// Ask the generator to generate data
if !(self.generator)(self.buffer.get_mut())? {
// End of stream
break;
}
}
self.buffer.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.buffer.consume(amt);
}
}
impl<F> Read for ReadGenerator<F>
where
F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
{
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Straightforward implementation on top of BufRead
let slice = self.fill_buf()?;
let len = buf.len().min(slice.len());
buf[..len].copy_from_slice(&slice[..len]);
self.consume(len);
Ok(len)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
fn test_chunks<T>(data: Vec<T>) -> Chunks fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
where where
T: AsRef<[u8]> + Default + Send + 'static, T: AsRef<[u8]> + Default + Send + 'static,
{ {
@ -788,46 +1131,89 @@ mod tests {
iter: data.into_iter(), iter: data.into_iter(),
cursor: Some(Cursor::new(T::default())), cursor: Some(Cursor::new(T::default())),
}; };
Chunks { Chunks::new(
reader: BufReader::new(SourceReader::Read(Box::new(reader))), SourceReader::Read(Box::new(reader)),
span: Span::test_data(), Span::test_data(),
ctrlc: None, None,
leftover: Vec::new(), type_,
} )
} }
#[test] #[test]
fn chunks_read_string() { fn chunks_read_binary_passthrough() {
let data = vec!["Nushell", "が好きです"]; let bins = vec![&[0, 1][..], &[2, 3][..]];
let chunks = test_chunks(data.clone()); let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
let actual = chunks.collect::<Result<Vec<_>, _>>().unwrap();
let expected = data.into_iter().map(Value::test_string).collect::<Vec<_>>();
assert_eq!(expected, actual);
}
#[test] let bins_values: Vec<Value> = bins
fn chunks_read_string_split_utf8() {
let expected = "Nushell最高!";
let chunks = test_chunks(vec![&b"Nushell\xe6"[..], b"\x9c\x80\xe9", b"\xab\x98!"]);
let actual = chunks
.into_iter() .into_iter()
.map(|value| value.and_then(Value::into_string)) .map(|bin| Value::binary(bin, Span::test_data()))
.collect::<Result<String, _>>() .collect();
.unwrap(); assert_eq!(
bins_values,
assert_eq!(expected, actual); iter.collect::<Result<Vec<Value>, _>>().expect("error")
);
} }
#[test] #[test]
fn chunks_returns_string_or_binary() { fn chunks_read_string_clean() {
let chunks = test_chunks(vec![b"Nushell".as_slice(), b"\x9c\x80\xe9abcd", b"efgh"]); let strs = vec!["Nushell", "が好きです"];
let actual = chunks.collect::<Result<Vec<_>, _>>().unwrap(); let iter = test_chunks(strs.clone(), ByteStreamType::String);
let expected = vec![
Value::test_string("Nushell"), let strs_values: Vec<Value> = strs
Value::test_binary(b"\x9c\x80\xe9abcd"), .into_iter()
Value::test_string("efgh"), .map(|string| Value::string(string, Span::test_data()))
]; .collect();
assert_eq!(actual, expected) assert_eq!(
strs_values,
iter.collect::<Result<Vec<Value>, _>>().expect("error")
);
}
#[test]
fn chunks_read_string_split_boundary() {
let real = "Nushell最高!";
let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
let iter = test_chunks(chunks.clone(), ByteStreamType::String);
let mut string = String::new();
for value in iter {
let chunk_string = value.expect("error").into_string().expect("not a string");
string.push_str(&chunk_string);
}
assert_eq!(real, string);
}
#[test]
fn chunks_read_string_utf8_error() {
let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
let iter = test_chunks(chunks, ByteStreamType::String);
let mut string = String::new();
for value in iter {
match value {
Ok(value) => string.push_str(&value.into_string().expect("not a string")),
Err(err) => {
println!("string so far: {:?}", string);
println!("got error: {err:?}");
assert!(!string.is_empty());
assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
return;
}
}
}
panic!("no error");
}
#[test]
fn chunks_read_unknown_fallback() {
let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
let mut get = || iter.next().expect("end of iter").expect("error");
assert_eq!(Value::test_string("Nushell"), get());
assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
// Once it's in binary mode it won't go back
assert_eq!(Value::test_binary(b"efgh"), get());
} }
} }

View File

@ -2,8 +2,8 @@ use crate::{
ast::{Call, PathMember}, ast::{Call, PathMember},
engine::{EngineState, Stack}, engine::{EngineState, Stack},
process::{ChildPipe, ChildProcess, ExitStatus}, process::{ChildPipe, ChildProcess, ExitStatus},
ByteStream, Config, ErrSpan, ListStream, OutDest, PipelineMetadata, Range, ShellError, Span, ByteStream, ByteStreamType, Config, ErrSpan, ListStream, OutDest, PipelineMetadata, Range,
Value, ShellError, Span, Value,
}; };
use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush}; use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush};
use std::{ use std::{
@ -170,6 +170,8 @@ impl PipelineData {
/// Try convert from self into iterator /// Try convert from self into iterator
/// ///
/// It returns Err if the `self` cannot be converted to an iterator. /// It returns Err if the `self` cannot be converted to an iterator.
///
/// The `span` should be the span of the command or operation that would raise an error.
pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> { pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
Ok(PipelineIterator(match self { Ok(PipelineIterator(match self {
PipelineData::Value(value, ..) => { PipelineData::Value(value, ..) => {
@ -274,7 +276,7 @@ impl PipelineData {
span: head, span: head,
}), }),
PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess { PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
type_name: "byte stream".to_string(), type_name: stream.type_().describe().to_owned(),
span: stream.span(), span: stream.span(),
}), }),
} }
@ -313,16 +315,7 @@ impl PipelineData {
Ok(PipelineData::ListStream(stream.map(f), metadata)) Ok(PipelineData::ListStream(stream.map(f), metadata))
} }
PipelineData::ByteStream(stream, metadata) => { PipelineData::ByteStream(stream, metadata) => {
// TODO: is this behavior desired / correct ? Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
let span = stream.span();
let value = match String::from_utf8(stream.into_bytes()?) {
Ok(mut str) => {
str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
f(Value::string(str, span))
}
Err(err) => f(Value::binary(err.into_bytes(), span)),
};
Ok(value.into_pipeline_data_with_metadata(metadata))
} }
} }
} }
@ -543,22 +536,26 @@ impl PipelineData {
no_newline: bool, no_newline: bool,
to_stderr: bool, to_stderr: bool,
) -> Result<Option<ExitStatus>, ShellError> { ) -> Result<Option<ExitStatus>, ShellError> {
if let PipelineData::ByteStream(stream, ..) = self { match self {
stream.print(to_stderr) // Print byte streams directly as long as they aren't binary.
} else { PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
// If the table function is in the declarations, then we can use it stream.print(to_stderr)
// to create the table value that will be printed in the terminal }
if let Some(decl_id) = engine_state.table_decl_id { _ => {
let command = engine_state.get_decl(decl_id); // If the table function is in the declarations, then we can use it
if command.block_id().is_some() { // to create the table value that will be printed in the terminal
self.write_all_and_flush(engine_state, no_newline, to_stderr) if let Some(decl_id) = engine_state.table_decl_id {
let command = engine_state.get_decl(decl_id);
if command.block_id().is_some() {
self.write_all_and_flush(engine_state, no_newline, to_stderr)
} else {
let call = Call::new(Span::new(0, 0));
let table = command.run(engine_state, stack, &call, self)?;
table.write_all_and_flush(engine_state, no_newline, to_stderr)
}
} else { } else {
let call = Call::new(Span::new(0, 0)); self.write_all_and_flush(engine_state, no_newline, to_stderr)
let table = command.run(engine_state, stack, &call, self)?;
table.write_all_and_flush(engine_state, no_newline, to_stderr)
} }
} else {
self.write_all_and_flush(engine_state, no_newline, to_stderr)
} }
} }
} }

View File

@ -1,6 +1,7 @@
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{ use nu_protocol::{
ByteStream, Category, Example, LabeledError, PipelineData, Signature, Type, Value, ByteStream, ByteStreamType, Category, Example, LabeledError, PipelineData, Signature, Type,
Value,
}; };
use crate::ExamplePlugin; use crate::ExamplePlugin;
@ -52,6 +53,7 @@ impl PluginCommand for CollectBytes {
input.into_iter().map(Value::coerce_into_binary), input.into_iter().map(Value::coerce_into_binary),
call.head, call.head,
None, None,
ByteStreamType::Unknown,
), ),
None, None,
)) ))