nushell/crates/nu-protocol/src/value/stream.rs
JT 6cdfee3573
Move Value to helpers, separate span call (#10121)
# Description

As part of the refactor to split spans off of Value, this moves to using
helper functions to create values, and using `.span()` instead of
matching span out of Value directly.

Hoping to get a few more helping hands to finish this, as there are a
lot of commands to update :)

# User-Facing Changes
<!-- List of all changes that impact the user experience here. This
helps us keep track of breaking changes. -->

# Tests + Formatting
<!--
Don't forget to add tests that cover your changes.

Make sure you've run and fixed any issues with these commands:

- `cargo fmt --all -- --check` to check standard code formatting (`cargo
fmt --all` applies these changes)
- `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used` to
check that you're using the standard code style
- `cargo test --workspace` to check that all tests pass (on Windows make
sure to [enable developer
mode](https://learn.microsoft.com/en-us/windows/apps/get-started/developer-mode-features-and-debugging))
- `cargo run -- -c "use std testing; testing run-tests --path
crates/nu-std"` to run the tests for the standard library

> **Note**
> from `nushell` you can also use the `toolkit` as follows
> ```bash
> use toolkit.nu # or use an `env_change` hook to activate it
automatically
> toolkit check pr
> ```
-->

# After Submitting
<!-- If your PR had any user-facing changes, update [the
documentation](https://github.com/nushell/nushell.github.io) after the
PR is merged, if necessary. This will help us keep the docs up to date.
-->

---------

Co-authored-by: Darren Schroeder <343840+fdncred@users.noreply.github.com>
Co-authored-by: WindSoilder <windsoilder@outlook.com>
2023-09-03 07:27:29 -07:00

233 lines
7.8 KiB
Rust

use crate::*;
use std::{
fmt::Debug,
sync::{atomic::AtomicBool, Arc},
};
pub struct RawStream {
pub stream: Box<dyn Iterator<Item = Result<Vec<u8>, ShellError>> + Send + 'static>,
pub leftover: Vec<u8>,
pub ctrlc: Option<Arc<AtomicBool>>,
pub is_binary: bool,
pub span: Span,
pub known_size: Option<u64>, // (bytes)
}
impl RawStream {
pub fn new(
stream: Box<dyn Iterator<Item = Result<Vec<u8>, ShellError>> + Send + 'static>,
ctrlc: Option<Arc<AtomicBool>>,
span: Span,
known_size: Option<u64>,
) -> Self {
Self {
stream,
leftover: vec![],
ctrlc,
is_binary: false,
span,
known_size,
}
}
pub fn into_bytes(self) -> Result<Spanned<Vec<u8>>, ShellError> {
let mut output = vec![];
for item in self.stream {
if nu_utils::ctrl_c::was_pressed(&self.ctrlc) {
break;
}
output.extend(item?);
}
Ok(Spanned {
item: output,
span: self.span,
})
}
pub fn into_string(self) -> Result<Spanned<String>, ShellError> {
let mut output = String::new();
let span = self.span;
let ctrlc = &self.ctrlc.clone();
for item in self {
if nu_utils::ctrl_c::was_pressed(ctrlc) {
break;
}
output.push_str(&item?.as_string()?);
}
Ok(Spanned { item: output, span })
}
pub fn chain(self, stream: RawStream) -> RawStream {
RawStream {
stream: Box::new(self.stream.chain(stream.stream)),
leftover: self.leftover.into_iter().chain(stream.leftover).collect(),
ctrlc: self.ctrlc,
is_binary: self.is_binary,
span: self.span,
known_size: self.known_size,
}
}
pub fn drain(self) -> Result<(), ShellError> {
for next in self {
match next {
Ok(val) => {
if let Value::Error { error, .. } = val {
return Err(*error);
}
}
Err(err) => return Err(err),
}
}
Ok(())
}
}
impl Debug for RawStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawStream").finish()
}
}
impl Iterator for RawStream {
type Item = Result<Value, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if nu_utils::ctrl_c::was_pressed(&self.ctrlc) {
return None;
}
// If we know we're already binary, just output that
if self.is_binary {
self.stream.next().map(|buffer| {
buffer.map(|mut v| {
if !self.leftover.is_empty() {
for b in self.leftover.drain(..).rev() {
v.insert(0, b);
}
}
Value::binary(v, self.span)
})
})
} else {
// We *may* be text. We're only going to try utf-8. Other decodings
// needs to be taken as binary first, then passed through `decode`.
if let Some(buffer) = self.stream.next() {
match buffer {
Ok(mut v) => {
if !self.leftover.is_empty() {
while let Some(b) = self.leftover.pop() {
v.insert(0, b);
}
}
match String::from_utf8(v.clone()) {
Ok(s) => {
// Great, we have a complete string, let's output it
Some(Ok(Value::string(s, self.span)))
}
Err(err) => {
// Okay, we *might* have a string but we've also got some errors
if v.is_empty() {
// We can just end here
None
} else if v.len() > 3
&& (v.len() - err.utf8_error().valid_up_to() > 3)
{
// As UTF-8 characters are max 4 bytes, if we have more than that in error we know
// that it's not just a character spanning two frames.
// We now know we are definitely binary, so switch to binary and stay there.
self.is_binary = true;
Some(Ok(Value::binary(v, self.span)))
} else {
// Okay, we have a tiny bit of error at the end of the buffer. This could very well be
// a character that spans two frames. Since this is the case, remove the error from
// the current frame an dput it in the leftover buffer.
self.leftover = v[err.utf8_error().valid_up_to()..].to_vec();
let buf = v[0..err.utf8_error().valid_up_to()].to_vec();
match String::from_utf8(buf) {
Ok(s) => Some(Ok(Value::string(s, self.span))),
Err(_) => {
// Something is definitely wrong. Switch to binary, and stay there
self.is_binary = true;
Some(Ok(Value::binary(v, self.span)))
}
}
}
}
}
}
Err(e) => Some(Err(e)),
}
} else if !self.leftover.is_empty() {
let output = Ok(Value::binary(self.leftover.clone(), self.span));
self.leftover.clear();
Some(output)
} else {
None
}
}
}
}
/// A potentially infinite stream of values, optionally with a mean to send a Ctrl-C signal to stop
/// the stream from continuing.
///
/// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates.
/// Like other iterators in Rust, observing values from this stream will drain the items as you view them
/// and the stream cannot be replayed.
pub struct ListStream {
pub stream: Box<dyn Iterator<Item = Value> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
}
impl ListStream {
pub fn into_string(self, separator: &str, config: &Config) -> String {
self.map(|x: Value| x.into_string(", ", config))
.collect::<Vec<String>>()
.join(separator)
}
pub fn drain(self) -> Result<(), ShellError> {
for next in self {
if let Value::Error { error, .. } = next {
return Err(*error);
}
}
Ok(())
}
pub fn from_stream(
input: impl Iterator<Item = Value> + Send + 'static,
ctrlc: Option<Arc<AtomicBool>>,
) -> ListStream {
ListStream {
stream: Box::new(input),
ctrlc,
}
}
}
impl Debug for ListStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ListStream").finish()
}
}
impl Iterator for ListStream {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
if nu_utils::ctrl_c::was_pressed(&self.ctrlc) {
None
} else {
self.stream.next()
}
}
}