use crate::*; use std::{ fmt::Debug, sync::{atomic::AtomicBool, Arc}, }; pub struct RawStream { pub stream: Box, ShellError>> + Send + 'static>, pub leftover: Vec, pub ctrlc: Option>, pub is_binary: bool, pub span: Span, pub known_size: Option, // (bytes) } impl RawStream { pub fn new( stream: Box, ShellError>> + Send + 'static>, ctrlc: Option>, span: Span, known_size: Option, ) -> Self { Self { stream, leftover: vec![], ctrlc, is_binary: false, span, known_size, } } pub fn into_bytes(self) -> Result>, ShellError> { let mut output = vec![]; for item in self.stream { if nu_utils::ctrl_c::was_pressed(&self.ctrlc) { break; } output.extend(item?); } Ok(Spanned { item: output, span: self.span, }) } pub fn into_string(self) -> Result, ShellError> { let mut output = String::new(); let span = self.span; let ctrlc = &self.ctrlc.clone(); for item in self { if nu_utils::ctrl_c::was_pressed(ctrlc) { break; } output.push_str(&item?.as_string()?); } Ok(Spanned { item: output, span }) } pub fn chain(self, stream: RawStream) -> RawStream { RawStream { stream: Box::new(self.stream.chain(stream.stream)), leftover: self.leftover.into_iter().chain(stream.leftover).collect(), ctrlc: self.ctrlc, is_binary: self.is_binary, span: self.span, known_size: self.known_size, } } pub fn drain(self) -> Result<(), ShellError> { for next in self { match next { Ok(val) => { if let Value::Error { error } = val { return Err(*error); } } Err(err) => return Err(err), } } Ok(()) } } impl Debug for RawStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RawStream").finish() } } impl Iterator for RawStream { type Item = Result; fn next(&mut self) -> Option { if nu_utils::ctrl_c::was_pressed(&self.ctrlc) { return None; } // If we know we're already binary, just output that if self.is_binary { self.stream.next().map(|buffer| { buffer.map(|mut v| { if !self.leftover.is_empty() { for b in self.leftover.drain(..).rev() { v.insert(0, b); } } Value::Binary { val: v, span: self.span, } }) }) } else { // We *may* be text. We're only going to try utf-8. Other decodings // needs to be taken as binary first, then passed through `decode`. if let Some(buffer) = self.stream.next() { match buffer { Ok(mut v) => { if !self.leftover.is_empty() { while let Some(b) = self.leftover.pop() { v.insert(0, b); } } match String::from_utf8(v.clone()) { Ok(s) => { // Great, we have a complete string, let's output it Some(Ok(Value::String { val: s, span: self.span, })) } Err(err) => { // Okay, we *might* have a string but we've also got some errors if v.is_empty() { // We can just end here None } else if v.len() > 3 && (v.len() - err.utf8_error().valid_up_to() > 3) { // As UTF-8 characters are max 4 bytes, if we have more than that in error we know // that it's not just a character spanning two frames. // We now know we are definitely binary, so switch to binary and stay there. self.is_binary = true; Some(Ok(Value::Binary { val: v, span: self.span, })) } else { // Okay, we have a tiny bit of error at the end of the buffer. This could very well be // a character that spans two frames. Since this is the case, remove the error from // the current frame an dput it in the leftover buffer. self.leftover = v[err.utf8_error().valid_up_to()..].to_vec(); let buf = v[0..err.utf8_error().valid_up_to()].to_vec(); match String::from_utf8(buf) { Ok(s) => Some(Ok(Value::String { val: s, span: self.span, })), Err(_) => { // Something is definitely wrong. Switch to binary, and stay there self.is_binary = true; Some(Ok(Value::Binary { val: v, span: self.span, })) } } } } } } Err(e) => Some(Err(e)), } } else if !self.leftover.is_empty() { let output = Ok(Value::Binary { val: self.leftover.clone(), span: self.span, }); self.leftover.clear(); Some(output) } else { None } } } } /// A potentially infinite stream of values, optionally with a mean to send a Ctrl-C signal to stop /// the stream from continuing. /// /// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates. /// Like other iterators in Rust, observing values from this stream will drain the items as you view them /// and the stream cannot be replayed. pub struct ListStream { pub stream: Box + Send + 'static>, pub ctrlc: Option>, } impl ListStream { pub fn into_string(self, separator: &str, config: &Config) -> String { self.map(|x: Value| x.into_string(", ", config)) .collect::>() .join(separator) } pub fn drain(self) -> Result<(), ShellError> { for next in self { if let Value::Error { error } = next { return Err(*error); } } Ok(()) } pub fn from_stream( input: impl Iterator + Send + 'static, ctrlc: Option>, ) -> ListStream { ListStream { stream: Box::new(input), ctrlc, } } } impl Debug for ListStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ListStream").finish() } } impl Iterator for ListStream { type Item = Value; fn next(&mut self) -> Option { if nu_utils::ctrl_c::was_pressed(&self.ctrlc) { None } else { self.stream.next() } } }