mirror of
https://github.com/nushell/nushell.git
synced 2025-08-09 12:15:42 +02:00
Implementing ByteStream interuption on infinite stream (#13552)
# Description This PR should address #13530 by explicitly handling ByteStreams. The issue can be replicated easily on linux by running: ```nushell open /dev/urandom | into binary | bytes at ..10 ``` Would leave the output hanging and with no way to cancel it, this was likely because it was trying to collect the input stream and would not complete. I have also put in an error to say that using negative offsets for a bytestream without a length cannot be used. ```nushell ~/git/nushell> open /dev/urandom | into binary | bytes at (-1).. Error: nu:🐚:incorrect_value × Incorrect value. ╭─[entry #3:1:35] 1 │ open /dev/urandom | into binary | bytes at (-1).. · ────┬─── ───┬── · │ ╰── encountered here · ╰── Negative range values cannot be used with streams that don't specify a length ╰──── ``` # User-Facing Changes No operation changes, only the warning you get back for negative offsets # Tests + Formatting Ran `toolkit check pr ` with no errors or warnings Manual testing of the example commands above --------- Co-authored-by: Ian Manske <ian.manske@pm.me> Co-authored-by: Simon Curtis <simon.curtis@candc-uk.com>
This commit is contained in:
@ -653,6 +653,18 @@ pub enum ShellError {
|
||||
creation_site: Span,
|
||||
},
|
||||
|
||||
/// Attempted to us a relative range on an infinite stream
|
||||
///
|
||||
/// ## Resolution
|
||||
///
|
||||
/// Ensure that either the range is absolute or the stream has a known length.
|
||||
#[error("Relative range values cannot be used with streams that don't have a known length")]
|
||||
#[diagnostic(code(nu::shell::relative_range_on_infinite_stream))]
|
||||
RelativeRangeOnInfiniteStream {
|
||||
#[label = "Relative range values cannot be used with streams that don't have a known length"]
|
||||
span: Span,
|
||||
},
|
||||
|
||||
/// An error happened while performing an external command.
|
||||
///
|
||||
/// ## Resolution
|
||||
|
@ -1,8 +1,9 @@
|
||||
//! Module managing the streaming of raw bytes between pipeline elements
|
||||
#[cfg(feature = "os")]
|
||||
use crate::process::{ChildPipe, ChildProcess};
|
||||
use crate::{ErrSpan, IntoSpanned, PipelineData, ShellError, Signals, Span, Type, Value};
|
||||
use crate::{ErrSpan, IntRange, IntoSpanned, PipelineData, ShellError, Signals, Span, Type, Value};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::Bound;
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::OwnedFd;
|
||||
#[cfg(windows)]
|
||||
@ -220,6 +221,79 @@ impl ByteStream {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
|
||||
let known_size = self.known_size.map(|len| len.saturating_sub(n));
|
||||
if let Some(mut reader) = self.reader() {
|
||||
// Copy the number of skipped bytes into the sink before proceeding
|
||||
io::copy(&mut (&mut reader).take(n), &mut io::sink()).err_span(span)?;
|
||||
Ok(
|
||||
ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
|
||||
.with_known_size(known_size),
|
||||
)
|
||||
} else {
|
||||
Err(ShellError::TypeMismatch {
|
||||
err_message: "expected readable stream".into(),
|
||||
span,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
|
||||
let known_size = self.known_size.map(|s| s.min(n));
|
||||
if let Some(reader) = self.reader() {
|
||||
Ok(ByteStream::read(
|
||||
reader.take(n),
|
||||
span,
|
||||
Signals::empty(),
|
||||
ByteStreamType::Binary,
|
||||
)
|
||||
.with_known_size(known_size))
|
||||
} else {
|
||||
Err(ShellError::TypeMismatch {
|
||||
err_message: "expected readable stream".into(),
|
||||
span,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn slice(
|
||||
self,
|
||||
val_span: Span,
|
||||
call_span: Span,
|
||||
range: IntRange,
|
||||
) -> Result<Self, ShellError> {
|
||||
if let Some(len) = self.known_size {
|
||||
let start = range.absolute_start(len);
|
||||
let stream = self.skip(val_span, start);
|
||||
|
||||
match range.absolute_end(len) {
|
||||
Bound::Unbounded => stream,
|
||||
Bound::Included(end) | Bound::Excluded(end) if end < start => {
|
||||
stream.and_then(|s| s.take(val_span, 0))
|
||||
}
|
||||
Bound::Included(end) => {
|
||||
let distance = end - start + 1;
|
||||
stream.and_then(|s| s.take(val_span, distance.min(len)))
|
||||
}
|
||||
Bound::Excluded(end) => {
|
||||
let distance = end - start;
|
||||
stream.and_then(|s| s.take(val_span, distance.min(len)))
|
||||
}
|
||||
}
|
||||
} else if range.is_relative() {
|
||||
Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
|
||||
} else {
|
||||
let start = range.start() as u64;
|
||||
let stream = self.skip(val_span, start);
|
||||
|
||||
match range.distance() {
|
||||
Bound::Unbounded => stream,
|
||||
Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance)),
|
||||
Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance - 1)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a [`ByteStream`] from a string. The type of the stream is always `String`.
|
||||
pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
|
||||
let len = string.len();
|
||||
|
@ -81,10 +81,46 @@ mod int_range {
|
||||
self.start
|
||||
}
|
||||
|
||||
// Resolves the absolute start position given the length of the input value
|
||||
pub fn absolute_start(&self, len: u64) -> u64 {
|
||||
let max_index = len - 1;
|
||||
match self.start {
|
||||
start if start < 0 => len.saturating_sub(start.unsigned_abs()),
|
||||
start => max_index.min(start as u64),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the distance between the start and end of the range
|
||||
/// The result will always be 0 or positive
|
||||
pub fn distance(&self) -> Bound<u64> {
|
||||
match self.end {
|
||||
Bound::Unbounded => Bound::Unbounded,
|
||||
Bound::Included(end) if self.start > end => Bound::Included(0),
|
||||
Bound::Excluded(end) if self.start > end => Bound::Excluded(0),
|
||||
Bound::Included(end) => Bound::Included((end - self.start) as u64),
|
||||
Bound::Excluded(end) => Bound::Excluded((end - self.start) as u64),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn end(&self) -> Bound<i64> {
|
||||
self.end
|
||||
}
|
||||
|
||||
pub fn absolute_end(&self, len: u64) -> Bound<u64> {
|
||||
let max_index = len - 1;
|
||||
match self.end {
|
||||
Bound::Unbounded => Bound::Unbounded,
|
||||
Bound::Included(i) => Bound::Included(match i {
|
||||
i if i < 0 => len.saturating_sub(i.unsigned_abs()),
|
||||
i => max_index.min(i as u64),
|
||||
}),
|
||||
Bound::Excluded(i) => Bound::Excluded(match i {
|
||||
i if i < 0 => len.saturating_sub(i.unsigned_abs()),
|
||||
i => len.min(i as u64),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn step(&self) -> i64 {
|
||||
self.step
|
||||
}
|
||||
@ -93,6 +129,21 @@ mod int_range {
|
||||
self.end == Bound::Unbounded
|
||||
}
|
||||
|
||||
pub fn is_relative(&self) -> bool {
|
||||
self.is_start_relative() || self.is_end_relative()
|
||||
}
|
||||
|
||||
pub fn is_start_relative(&self) -> bool {
|
||||
self.start < 0
|
||||
}
|
||||
|
||||
pub fn is_end_relative(&self) -> bool {
|
||||
match self.end {
|
||||
Bound::Included(end) | Bound::Excluded(end) => end < 0,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, value: i64) -> bool {
|
||||
if self.step < 0 {
|
||||
// Decreasing range
|
||||
|
Reference in New Issue
Block a user