Make use of interruptible stream in various places

This commit is contained in:
Jason Gedge 2020-03-29 12:30:36 -04:00 committed by Jason Gedge
parent 2a54ee0c54
commit 35dc7438a5
2 changed files with 23 additions and 35 deletions

View File

@ -9,7 +9,6 @@ use nu_errors::ShellError;
use nu_protocol::{CallInfo, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; use nu_protocol::{CallInfo, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
use nu_source::Tagged; use nu_source::Tagged;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::Ordering;
const NAME: &str = "du"; const NAME: &str = "du";
const GLOB_PARAMS: MatchOptions = MatchOptions { const GLOB_PARAMS: MatchOptions = MatchOptions {
@ -142,7 +141,6 @@ fn du(args: DuArgs, ctx: &RunnablePerItemContext) -> Result<OutputStream, ShellE
let max_depth = args.max_depth.map(|f| f.item); let max_depth = args.max_depth.map(|f| f.item);
let min_size = args.min_size.map(|f| f.item); let min_size = args.min_size.map(|f| f.item);
let stream = async_stream! {
let params = DirBuilder { let params = DirBuilder {
tag: tag.clone(), tag: tag.clone(),
min: min_size, min: min_size,
@ -150,27 +148,22 @@ fn du(args: DuArgs, ctx: &RunnablePerItemContext) -> Result<OutputStream, ShellE
ex: exclude, ex: exclude,
all, all,
}; };
for path in paths {
if ctrl_c.load(Ordering::SeqCst) { let stream = futures::stream::iter(paths)
break; .interruptible(ctrl_c)
} .map(move |path| match path {
match path {
Ok(p) => { Ok(p) => {
if p.is_dir() { if p.is_dir() {
yield Ok(ReturnSuccess::Value( Ok(ReturnSuccess::Value(
DirInfo::new(p, &params, max_depth).into(), DirInfo::new(p, &params, max_depth).into(),
)); ))
} else { } else {
match FileInfo::new(p, deref, tag.clone()) { FileInfo::new(p, deref, tag.clone()).map(|v| ReturnSuccess::Value(v.into()))
Ok(f) => yield Ok(ReturnSuccess::Value(f.into())),
Err(e) => yield Err(e)
} }
} }
} Err(e) => Err(e),
Err(e) => yield Err(e), });
}
}
};
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())
} }

View File

@ -16,7 +16,6 @@ use rustyline::completion::FilenameCompleter;
use rustyline::hint::{Hinter, HistoryHinter}; use rustyline::hint::{Hinter, HistoryHinter};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Component, Path, PathBuf}; use std::path::{Component, Path, PathBuf};
use std::sync::atomic::Ordering;
use trash as SendToTrash; use trash as SendToTrash;
#[cfg(unix)] #[cfg(unix)]
@ -163,10 +162,6 @@ impl Shell for FilesystemShell {
// Generated stream: impl Stream<Item = Result<ReturnSuccess, ShellError> // Generated stream: impl Stream<Item = Result<ReturnSuccess, ShellError>
let stream = async_stream::try_stream! { let stream = async_stream::try_stream! {
for path in paths { for path in paths {
if ctrl_c.load(Ordering::SeqCst) {
break;
}
let path = path.map_err(|e| ShellError::from(e.into_error()))?; let path = path.map_err(|e| ShellError::from(e.into_error()))?;
if !all && is_hidden_dir(&path) { if !all && is_hidden_dir(&path) {
@ -196,7 +191,7 @@ impl Shell for FilesystemShell {
} }
}; };
Ok(stream.to_output_stream()) Ok(stream.interruptible(ctrl_c).to_output_stream())
} }
fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> { fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {