mirror of
https://github.com/nushell/nushell.git
synced 2025-08-12 06:59:16 +02:00
read_byte_stream implemented for ByteStreams
This commit is contained in:
committed by
Simon Curtis
parent
bf8763fc11
commit
cd6ff56d04
@ -1,9 +1,11 @@
|
|||||||
|
use itertools::Itertools;
|
||||||
use nu_cmd_base::{
|
use nu_cmd_base::{
|
||||||
input_handler::{operate, CmdArgument},
|
input_handler::{operate, CmdArgument},
|
||||||
util,
|
util,
|
||||||
};
|
};
|
||||||
use nu_engine::command_prelude::*;
|
use nu_engine::command_prelude::*;
|
||||||
use nu_protocol::Range;
|
use nu_protocol::{Range, Reader};
|
||||||
|
use std::io::{Bytes, Read, Write};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BytesAt;
|
pub struct BytesAt;
|
||||||
@ -83,7 +85,11 @@ impl Command for BytesAt {
|
|||||||
cell_paths,
|
cell_paths,
|
||||||
};
|
};
|
||||||
|
|
||||||
operate(action, args, input, call.head, engine_state.signals())
|
if let PipelineData::ByteStream(stream, metadata) = input {
|
||||||
|
handle_byte_stream(&args, stream, call, metadata, engine_state)
|
||||||
|
} else {
|
||||||
|
operate(action, args, input, call.head, engine_state.signals())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn examples(&self) -> Vec<Example> {
|
fn examples(&self) -> Vec<Example> {
|
||||||
@ -127,27 +133,8 @@ impl Command for BytesAt {
|
|||||||
fn action(input: &Value, args: &Arguments, head: Span) -> Value {
|
fn action(input: &Value, args: &Arguments, head: Span) -> Value {
|
||||||
let range = &args.indexes;
|
let range = &args.indexes;
|
||||||
match input {
|
match input {
|
||||||
Value::Binary { val, .. } => {
|
Value::Binary { val, .. } => read_bytes(val, range, head),
|
||||||
let len = val.len() as isize;
|
|
||||||
let start = if range.0 < 0 { range.0 + len } else { range.0 };
|
|
||||||
let end = if range.1 < 0 { range.1 + len } else { range.1 };
|
|
||||||
|
|
||||||
if start > end {
|
|
||||||
Value::binary(vec![], head)
|
|
||||||
} else {
|
|
||||||
let val_iter = val.iter().skip(start as usize);
|
|
||||||
Value::binary(
|
|
||||||
if end == isize::MAX {
|
|
||||||
val_iter.copied().collect::<Vec<u8>>()
|
|
||||||
} else {
|
|
||||||
val_iter.take((end - start + 1) as usize).copied().collect()
|
|
||||||
},
|
|
||||||
head,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Value::Error { .. } => input.clone(),
|
Value::Error { .. } => input.clone(),
|
||||||
|
|
||||||
other => Value::error(
|
other => Value::error(
|
||||||
ShellError::UnsupportedInput {
|
ShellError::UnsupportedInput {
|
||||||
msg: "Only binary values are supported".into(),
|
msg: "Only binary values are supported".into(),
|
||||||
@ -159,3 +146,88 @@ fn action(input: &Value, args: &Arguments, head: Span) -> Value {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_byte_stream(
|
||||||
|
args: &Arguments,
|
||||||
|
stream: ByteStream,
|
||||||
|
call: &Call,
|
||||||
|
metadata: Option<nu_protocol::PipelineMetadata>,
|
||||||
|
engine_state: &EngineState,
|
||||||
|
) -> Result<PipelineData, ShellError> {
|
||||||
|
let idxs = args.indexes;
|
||||||
|
if idxs.0 >= idxs.1 {
|
||||||
|
return Ok(PipelineData::empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
match stream.reader() {
|
||||||
|
Some(reader) => {
|
||||||
|
let iter = reader.bytes();
|
||||||
|
|
||||||
|
if idxs.0 < 0 || idxs.1 < 0 {
|
||||||
|
match iter.try_len() {
|
||||||
|
Ok(_) => {
|
||||||
|
let vec = iter.filter_map(Result::ok).collect::<Vec<u8>>();
|
||||||
|
Ok(read_bytes(&vec, &idxs, call.head).into_pipeline_data_with_metadata(metadata))
|
||||||
|
}
|
||||||
|
_ => Err(ShellError::IncorrectValue {
|
||||||
|
msg:
|
||||||
|
"Negative range values cannot be used with streams that don't specify a length"
|
||||||
|
.into(),
|
||||||
|
val_span: call.head,
|
||||||
|
call_span: call.arguments_span(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(read_stream(iter, idxs, call, engine_state, metadata))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Ok(PipelineData::empty()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_bytes(val: &[u8], range: &Subbytes, head: Span) -> Value {
|
||||||
|
let len = val.len() as isize;
|
||||||
|
let start = if range.0 < 0 { range.0 + len } else { range.0 };
|
||||||
|
let end = if range.1 < 0 { range.1 + len } else { range.1 };
|
||||||
|
|
||||||
|
if start > end {
|
||||||
|
Value::binary(vec![], head)
|
||||||
|
} else {
|
||||||
|
let val_iter = val.iter().skip(start as usize);
|
||||||
|
Value::binary(
|
||||||
|
if end == isize::MAX {
|
||||||
|
val_iter.copied().collect::<Vec<u8>>()
|
||||||
|
} else {
|
||||||
|
val_iter.take((end - start + 1) as usize).copied().collect()
|
||||||
|
},
|
||||||
|
head,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_stream(
|
||||||
|
iter: Bytes<Reader>,
|
||||||
|
range: Subbytes,
|
||||||
|
call: &Call,
|
||||||
|
engine_state: &EngineState,
|
||||||
|
metadata: Option<nu_protocol::PipelineMetadata>,
|
||||||
|
) -> PipelineData {
|
||||||
|
let start = range.0 as usize;
|
||||||
|
let end = (range.1 - range.0) as usize;
|
||||||
|
let mut iter = iter.skip(start).take(end);
|
||||||
|
|
||||||
|
let stream = ByteStream::from_fn(
|
||||||
|
call.head,
|
||||||
|
engine_state.signals().clone(),
|
||||||
|
ByteStreamType::Binary,
|
||||||
|
move |buf| match iter.next() {
|
||||||
|
Some(Ok(n)) if n > 0 => match buf.write(&[n]) {
|
||||||
|
Ok(_) => Ok(true),
|
||||||
|
Err(err) => Err(err.into()),
|
||||||
|
},
|
||||||
|
_ => Ok(false),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
PipelineData::ByteStream(stream, metadata)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user