Files
nushell/crates/nu-command/src/filesystem/watch.rs
Ian Manske b6c7656194 IO and redirection overhaul (#11934)
# Description
The PR overhauls how IO redirection is handled, allowing more explicit
and fine-grain control over `stdout` and `stderr` output as well as more
efficient IO and piping.

To summarize the changes in this PR:
- Added a new `IoStream` type to indicate the intended destination for a
pipeline element's `stdout` and `stderr`.
- The `stdout` and `stderr` `IoStream`s are stored in the `Stack` and to
avoid adding 6 additional arguments to every eval function and
`Command::run`. The `stdout` and `stderr` streams can be temporarily
overwritten through functions on `Stack` and these functions will return
a guard that restores the original `stdout` and `stderr` when dropped.
- In the AST, redirections are now directly part of a `PipelineElement`
as a `Option<Redirection>` field instead of having multiple different
`PipelineElement` enum variants for each kind of redirection. This
required changes to the parser, mainly in `lite_parser.rs`.
- `Command`s can also set a `IoStream` override/redirection which will
apply to the previous command in the pipeline. This is used, for
example, in `ignore` to allow the previous external command to have its
stdout redirected to `Stdio::null()` at spawn time. In contrast, the
current implementation has to create an os pipe and manually consume the
output on nushell's side. File and pipe redirections (`o>`, `e>`, `e>|`,
etc.) have precedence over overrides from commands.

This PR improves piping and IO speed, partially addressing #10763. Using
the `throughput` command from that issue, this PR gives the following
speedup on my setup for the commands below:
| Command | Before (MB/s) | After (MB/s) | Bash (MB/s) |
| --------------------------- | -------------:| ------------:|
-----------:|
| `throughput o> /dev/null` | 1169 | 52938 | 54305 |
| `throughput \| ignore` | 840 | 55438 | N/A |
| `throughput \| null` | Error | 53617 | N/A |
| `throughput \| rg 'x'` | 1165 | 3049 | 3736 |
| `(throughput) \| rg 'x'` | 810 | 3085 | 3815 |

(Numbers above are the median samples for throughput)

This PR also paves the way to refactor our `ExternalStream` handling in
the various commands. For example, this PR already fixes the following
code:
```nushell
^sh -c 'echo -n "hello "; sleep 0; echo "world"' | find "hello world"
```
This returns an empty list on 0.90.1 and returns a highlighted "hello
world" on this PR.

Since the `stdout` and `stderr` `IoStream`s are available to commands
when they are run, then this unlocks the potential for more convenient
behavior. E.g., the `find` command can disable its ansi highlighting if
it detects that the output `IoStream` is not the terminal. Knowing the
output streams will also allow background job output to be redirected
more easily and efficiently.

# User-Facing Changes
- External commands returned from closures will be collected (in most
cases):
  ```nushell
  1..2 | each {|_| nu -c "print a" }
  ```
This gives `["a", "a"]` on this PR, whereas this used to print "a\na\n"
and then return an empty list.

  ```nushell
  1..2 | each {|_| nu -c "print -e a" }
  ```
This gives `["", ""]` and prints "a\na\n" to stderr, whereas this used
to return an empty list and print "a\na\n" to stderr.

- Trailing new lines are always trimmed for external commands when
piping into internal commands or collecting it as a value. (Failure to
decode the output as utf-8 will keep the trailing newline for the last
binary value.) In the current nushell version, the following three code
snippets differ only in parenthesis placement, but they all also have
different outputs:

  1. `1..2 | each { ^echo a }`
     ```
     a
     a
     ╭────────────╮
     │ empty list │
     ╰────────────╯
     ```
  2. `1..2 | each { (^echo a) }`
     ```
     ╭───┬───╮
     │ 0 │ a │
     │ 1 │ a │
     ╰───┴───╯
     ```
  3. `1..2 | (each { ^echo a })`
     ```
     ╭───┬───╮
     │ 0 │ a │
     │   │   │
     │ 1 │ a │
     │   │   │
     ╰───┴───╯
     ```

  But in this PR, the above snippets will all have the same output:
  ```
  ╭───┬───╮
  │ 0 │ a │
  │ 1 │ a │
  ╰───┴───╯
  ```

- All existing flags on `run-external` are now deprecated.

- File redirections now apply to all commands inside a code block:
  ```nushell
  (nu -c "print -e a"; nu -c "print -e b") e> test.out
  ```
This gives "a\nb\n" in `test.out` and prints nothing. The same result
would happen when printing to stdout and using a `o>` file redirection.

- External command output will (almost) never be ignored, and ignoring
output must be explicit now:
  ```nushell
  (^echo a; ^echo b)
  ```
This prints "a\nb\n", whereas this used to print only "b\n". This only
applies to external commands; values and internal commands not in return
position will not print anything (e.g., `(echo a; echo b)` still only
prints "b").

- `complete` now always captures stderr (`do` is not necessary).

# After Submitting
The language guide and other documentation will need to be updated.
2024-03-14 15:51:55 -05:00

323 lines
12 KiB
Rust

use std::path::PathBuf;
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::time::Duration;
use notify_debouncer_full::{
new_debouncer,
notify::{
event::{DataChange, ModifyKind, RenameMode},
EventKind, RecursiveMode, Watcher,
},
};
use nu_engine::{current_dir, get_eval_block, CallExt};
use nu_protocol::ast::Call;
use nu_protocol::engine::{Closure, Command, EngineState, Stack, StateWorkingSet};
use nu_protocol::{
format_error, Category, Example, IntoPipelineData, PipelineData, ShellError, Signature,
Spanned, SyntaxShape, Type, Value,
};
// durations chosen mostly arbitrarily
const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
#[derive(Clone)]
pub struct Watch;
impl Command for Watch {
fn name(&self) -> &str {
"watch"
}
fn usage(&self) -> &str {
"Watch for file changes and execute Nu code when they happen."
}
fn search_terms(&self) -> Vec<&str> {
vec!["watcher", "reload", "filesystem"]
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("watch")
.input_output_types(vec![(Type::Nothing, Type::Table(vec![]))])
.required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
.required("closure",
SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
"Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.")
.named(
"debounce-ms",
SyntaxShape::Int,
"Debounce changes for this many milliseconds (default: 100). Adjust if you find that single writes are reported as multiple events",
Some('d'),
)
.named(
"glob",
SyntaxShape::String, // SyntaxShape::GlobPattern gets interpreted relative to cwd, so use String instead
"Only report changes for files that match this glob pattern (default: all files)",
Some('g'),
)
.named(
"recursive",
SyntaxShape::Boolean,
"Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true)",
Some('r'),
)
.switch("verbose", "Operate in verbose mode (default: false)", Some('v'))
.category(Category::FileSystem)
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let cwd = current_dir(engine_state, stack)?;
let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
let path_no_whitespace = &path_arg
.item
.trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
let path = match nu_path::canonicalize_with(path_no_whitespace, cwd) {
Ok(p) => p,
Err(_) => {
return Err(ShellError::DirectoryNotFound {
dir: path_no_whitespace.to_string(),
span: path_arg.span,
})
}
};
let capture_block: Closure = call.req(engine_state, stack, 1)?;
let block = engine_state
.clone()
.get_block(capture_block.block_id)
.clone();
let verbose = call.has_flag(engine_state, stack, "verbose")?;
let debounce_duration_flag: Option<Spanned<i64>> =
call.get_flag(engine_state, stack, "debounce-ms")?;
let debounce_duration = match debounce_duration_flag {
Some(val) => match u64::try_from(val.item) {
Ok(val) => Duration::from_millis(val),
Err(_) => {
return Err(ShellError::TypeMismatch {
err_message: "Debounce duration is invalid".to_string(),
span: val.span,
})
}
},
None => DEFAULT_WATCH_DEBOUNCE_DURATION,
};
let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
let glob_pattern = match glob_flag {
Some(glob) => {
let absolute_path = path.join(glob.item);
if verbose {
eprintln!("Absolute glob path: {absolute_path:?}");
}
match nu_glob::Pattern::new(&absolute_path.to_string_lossy()) {
Ok(pattern) => Some(pattern),
Err(_) => {
return Err(ShellError::TypeMismatch {
err_message: "Glob pattern is invalid".to_string(),
span: glob.span,
})
}
}
}
None => None,
};
let recursive_flag: Option<Spanned<bool>> =
call.get_flag(engine_state, stack, "recursive")?;
let recursive_mode = match recursive_flag {
Some(recursive) => {
if recursive.item {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
}
}
None => RecursiveMode::Recursive,
};
let ctrlc_ref = &engine_state.ctrlc.clone();
let (tx, rx) = channel();
let mut debouncer = match new_debouncer(debounce_duration, None, tx) {
Ok(d) => d,
Err(e) => {
return Err(ShellError::IOError {
msg: format!("Failed to create watcher: {e}"),
})
}
};
if let Err(e) = debouncer.watcher().watch(&path, recursive_mode) {
return Err(ShellError::IOError {
msg: format!("Failed to create watcher: {e}"),
});
}
// need to cache to make sure that rename event works.
debouncer.cache().add_root(&path, recursive_mode);
eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
let eval_block = get_eval_block(engine_state);
let event_handler =
|operation: &str, path: PathBuf, new_path: Option<PathBuf>| -> Result<(), ShellError> {
let glob_pattern = glob_pattern.clone();
let matches_glob = match glob_pattern.clone() {
Some(glob) => glob.matches_path(&path),
None => true,
};
if verbose && glob_pattern.is_some() {
eprintln!("Matches glob: {matches_glob}");
}
if matches_glob {
let stack = &mut stack.clone();
if let Some(position) = block.signature.get_positional(0) {
if let Some(position_id) = &position.var_id {
stack.add_var(*position_id, Value::string(operation, call.span()));
}
}
if let Some(position) = block.signature.get_positional(1) {
if let Some(position_id) = &position.var_id {
stack.add_var(
*position_id,
Value::string(path.to_string_lossy(), call.span()),
);
}
}
if let Some(position) = block.signature.get_positional(2) {
if let Some(position_id) = &position.var_id {
stack.add_var(
*position_id,
Value::string(
new_path.unwrap_or_else(|| "".into()).to_string_lossy(),
call.span(),
),
);
}
}
let eval_result = eval_block(
engine_state,
stack,
&block,
Value::nothing(call.span()).into_pipeline_data(),
);
match eval_result {
Ok(val) => {
val.print(engine_state, stack, false, false)?;
}
Err(err) => {
let working_set = StateWorkingSet::new(engine_state);
eprintln!("{}", format_error(&working_set, &err));
}
}
}
Ok(())
};
loop {
match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
Ok(Ok(events)) => {
if verbose {
eprintln!("{events:?}");
}
for mut one_event in events {
let handle_result = match one_event.event.kind {
// only want to handle event if relative path exists.
EventKind::Create(_) => one_event
.paths
.pop()
.map(|path| event_handler("Create", path, None))
.unwrap_or(Ok(())),
EventKind::Remove(_) => one_event
.paths
.pop()
.map(|path| event_handler("Remove", path, None))
.unwrap_or(Ok(())),
EventKind::Modify(ModifyKind::Data(DataChange::Content))
| EventKind::Modify(ModifyKind::Data(DataChange::Any))
| EventKind::Modify(ModifyKind::Any) => one_event
.paths
.pop()
.map(|path| event_handler("Write", path, None))
.unwrap_or(Ok(())),
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => one_event
.paths
.pop()
.map(|to| {
one_event
.paths
.pop()
.map(|from| event_handler("Rename", from, Some(to)))
.unwrap_or(Ok(()))
})
.unwrap_or(Ok(())),
_ => Ok(()),
};
handle_result?;
}
}
Ok(Err(_)) => {
return Err(ShellError::IOError {
msg: "Unexpected errors when receiving events".into(),
})
}
Err(RecvTimeoutError::Disconnected) => {
return Err(ShellError::IOError {
msg: "Unexpected disconnect from file watcher".into(),
});
}
Err(RecvTimeoutError::Timeout) => {}
}
if nu_utils::ctrl_c::was_pressed(ctrlc_ref) {
break;
}
}
Ok(PipelineData::empty())
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Run `cargo test` whenever a Rust file changes",
example: r#"watch . --glob=**/*.rs {|| cargo test }"#,
result: None,
},
Example {
description: "Watch all changes in the current directory",
example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
result: None,
},
Example {
description: "Log all changes in a directory",
example: r#"watch /foo/bar { |op, path| $"($op) - ($path)(char nl)" | save --append changes_in_bar.log }"#,
result: None,
},
Example {
description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep",
example: r#"loop { command; sleep duration }"#,
result: None,
},
]
}
}