From cad2741e9e2fa830f6b1b754f21150331b579686 Mon Sep 17 00:00:00 2001 From: Jason Gedge Date: Sun, 29 Mar 2020 10:23:19 -0400 Subject: [PATCH] Split input and output streams into separate modules --- crates/nu-cli/src/shell/filesystem_shell.rs | 11 -- .../nu-cli/src/{stream.rs => stream/input.rs} | 105 +---------------- crates/nu-cli/src/stream/mod.rs | 5 + crates/nu-cli/src/stream/output.rs | 106 ++++++++++++++++++ 4 files changed, 112 insertions(+), 115 deletions(-) rename crates/nu-cli/src/{stream.rs => stream/input.rs} (61%) create mode 100644 crates/nu-cli/src/stream/mod.rs create mode 100644 crates/nu-cli/src/stream/output.rs diff --git a/crates/nu-cli/src/shell/filesystem_shell.rs b/crates/nu-cli/src/shell/filesystem_shell.rs index fe9143b548..6b10642fb4 100644 --- a/crates/nu-cli/src/shell/filesystem_shell.rs +++ b/crates/nu-cli/src/shell/filesystem_shell.rs @@ -163,23 +163,16 @@ impl Shell for FilesystemShell { // Generated stream: impl Stream let stream = async_stream::try_stream! { for path in paths { - // Handle CTRL+C presence if ctrl_c.load(Ordering::SeqCst) { break; } - // Map GlobError to ShellError and gracefully try to unwrap the path let path = path.map_err(|e| ShellError::from(e.into_error()))?; - // Skip if '--all/-a' flag is present and this path is hidden if !all && is_hidden_dir(&path) { continue; } - // Get metadata from current path, if we don't have enough - // permissions to stat on file don't use any metadata, otherwise - // return the error and gracefully unwrap metadata (which yields - // Option) let metadata = match std::fs::symlink_metadata(&path) { Ok(metadata) => Ok(Some(metadata)), Err(e) => if let PermissionDenied = e.kind() { @@ -189,9 +182,6 @@ impl Shell for FilesystemShell { }, }?; - // Build dict entry for this path and possibly using some metadata. - // Map the possible dict entry into a Value, gracefully unwrap it - // with '?' let entry = dir_entry_dict( &path, metadata.as_ref(), @@ -202,7 +192,6 @@ impl Shell for FilesystemShell { ) .map(|entry| ReturnSuccess::Value(entry.into()))?; - // Finally yield the generated entry that was mapped to Value yield entry; } }; diff --git a/crates/nu-cli/src/stream.rs b/crates/nu-cli/src/stream/input.rs similarity index 61% rename from crates/nu-cli/src/stream.rs rename to crates/nu-cli/src/stream/input.rs index 1d3e179d80..953d55c14c 100644 --- a/crates/nu-cli/src/stream.rs +++ b/crates/nu-cli/src/stream/input.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use futures::stream::iter; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, UntaggedValue, Value}; +use nu_protocol::{Primitive, UntaggedValue, Value}; use nu_source::{Tagged, TaggedItem}; pub struct InputStream { @@ -148,106 +148,3 @@ impl From> for InputStream { } } } - -pub struct OutputStream { - pub(crate) values: BoxStream<'static, ReturnValue>, -} - -impl OutputStream { - pub fn new(values: impl Stream + Send + 'static) -> OutputStream { - OutputStream { - values: values.boxed(), - } - } - - pub fn empty() -> OutputStream { - let v: VecDeque = VecDeque::new(); - v.into() - } - - pub fn one(item: impl Into) -> OutputStream { - let mut v: VecDeque = VecDeque::new(); - v.push_back(item.into()); - v.into() - } - - pub fn from_input(input: impl Stream + Send + 'static) -> OutputStream { - OutputStream { - values: input.map(ReturnSuccess::value).boxed(), - } - } - - pub fn drain_vec(&mut self) -> impl Future> { - let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed(); - std::mem::swap(&mut values, &mut self.values); - - values.collect() - } -} - -impl Stream for OutputStream { - type Item = ReturnValue; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> core::task::Poll> { - Stream::poll_next(std::pin::Pin::new(&mut self.values), cx) - } -} - -impl From for OutputStream { - fn from(input: InputStream) -> OutputStream { - OutputStream { - values: input.values.map(ReturnSuccess::value).boxed(), - } - } -} - -impl From> for OutputStream { - fn from(input: BoxStream<'static, Value>) -> OutputStream { - OutputStream { - values: input.map(ReturnSuccess::value).boxed(), - } - } -} - -impl From> for OutputStream { - fn from(input: BoxStream<'static, ReturnValue>) -> OutputStream { - OutputStream { values: input } - } -} - -impl From> for OutputStream { - fn from(input: VecDeque) -> OutputStream { - OutputStream { - values: futures::stream::iter(input).boxed(), - } - } -} - -impl From> for OutputStream { - fn from(input: VecDeque) -> OutputStream { - let stream = input.into_iter().map(ReturnSuccess::value); - OutputStream { - values: futures::stream::iter(stream).boxed(), - } - } -} - -impl From> for OutputStream { - fn from(input: Vec) -> OutputStream { - OutputStream { - values: futures::stream::iter(input).boxed(), - } - } -} - -impl From> for OutputStream { - fn from(input: Vec) -> OutputStream { - let stream = input.into_iter().map(ReturnSuccess::value); - OutputStream { - values: futures::stream::iter(stream).boxed(), - } - } -} diff --git a/crates/nu-cli/src/stream/mod.rs b/crates/nu-cli/src/stream/mod.rs new file mode 100644 index 0000000000..3daa62395f --- /dev/null +++ b/crates/nu-cli/src/stream/mod.rs @@ -0,0 +1,5 @@ +mod input; +mod output; + +pub use input::*; +pub use output::*; diff --git a/crates/nu-cli/src/stream/output.rs b/crates/nu-cli/src/stream/output.rs new file mode 100644 index 0000000000..7e785546e6 --- /dev/null +++ b/crates/nu-cli/src/stream/output.rs @@ -0,0 +1,106 @@ +use crate::prelude::*; +use futures::stream::iter; +use nu_protocol::{ReturnSuccess, ReturnValue, Value}; + +pub struct OutputStream { + pub(crate) values: BoxStream<'static, ReturnValue>, +} + +impl OutputStream { + pub fn new(values: impl Stream + Send + 'static) -> OutputStream { + OutputStream { + values: values.boxed(), + } + } + + pub fn empty() -> OutputStream { + let v: VecDeque = VecDeque::new(); + v.into() + } + + pub fn one(item: impl Into) -> OutputStream { + let mut v: VecDeque = VecDeque::new(); + v.push_back(item.into()); + v.into() + } + + pub fn from_input(input: impl Stream + Send + 'static) -> OutputStream { + OutputStream { + values: input.map(ReturnSuccess::value).boxed(), + } + } + + pub fn drain_vec(&mut self) -> impl Future> { + let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed(); + std::mem::swap(&mut values, &mut self.values); + + values.collect() + } +} + +impl Stream for OutputStream { + type Item = ReturnValue; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> core::task::Poll> { + Stream::poll_next(std::pin::Pin::new(&mut self.values), cx) + } +} + +impl From for OutputStream { + fn from(input: InputStream) -> OutputStream { + OutputStream { + values: input.values.map(ReturnSuccess::value).boxed(), + } + } +} + +impl From> for OutputStream { + fn from(input: BoxStream<'static, Value>) -> OutputStream { + OutputStream { + values: input.map(ReturnSuccess::value).boxed(), + } + } +} + +impl From> for OutputStream { + fn from(input: BoxStream<'static, ReturnValue>) -> OutputStream { + OutputStream { values: input } + } +} + +impl From> for OutputStream { + fn from(input: VecDeque) -> OutputStream { + OutputStream { + values: futures::stream::iter(input).boxed(), + } + } +} + +impl From> for OutputStream { + fn from(input: VecDeque) -> OutputStream { + let stream = input.into_iter().map(ReturnSuccess::value); + OutputStream { + values: futures::stream::iter(stream).boxed(), + } + } +} + +impl From> for OutputStream { + fn from(input: Vec) -> OutputStream { + OutputStream { + values: futures::stream::iter(input).boxed(), + } + } +} + +impl From> for OutputStream { + fn from(input: Vec) -> OutputStream { + let stream = input.into_iter().map(ReturnSuccess::value); + OutputStream { + values: futures::stream::iter(stream).boxed(), + } + } +}