diff --git a/crates/nu-command/src/filesystem/watch.rs b/crates/nu-command/src/filesystem/watch.rs index 5599c39c51..44417263f5 100644 --- a/crates/nu-command/src/filesystem/watch.rs +++ b/crates/nu-command/src/filesystem/watch.rs @@ -1,19 +1,21 @@ +use itertools::{Either, Itertools}; use notify_debouncer_full::{ - new_debouncer, + DebouncedEvent, Debouncer, FileIdMap, new_debouncer, notify::{ - EventKind, RecursiveMode, Watcher, + self, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event::{DataChange, ModifyKind, RenameMode}, }, }; use nu_engine::{ClosureEval, command_prelude::*}; use nu_protocol::{ - DeprecationEntry, DeprecationType, ReportMode, engine::Closure, report_shell_error, + DeprecationEntry, DeprecationType, ReportMode, Signals, engine::Closure, report_shell_error, shell_error::io::IoError, }; use std::{ - path::PathBuf, - sync::mpsc::{RecvTimeoutError, channel}, + borrow::Cow, + path::{Path, PathBuf}, + sync::mpsc::{Receiver, RecvTimeoutError, channel}, time::Duration, }; @@ -33,6 +35,10 @@ impl Command for Watch { "Watch for file changes and execute Nu code when they happen." } + fn extra_description(&self) -> &str { + "When run without a closure, `watch` returns a stream of events instead." + } + fn search_terms(&self) -> Vec<&str> { vec!["watcher", "reload", "filesystem"] } @@ -49,12 +55,23 @@ impl Command for Watch { fn signature(&self) -> nu_protocol::Signature { Signature::build("watch") - // actually `watch` never returns normally, but we don't have `noreturn` / `never` type yet - .input_output_types(vec![(Type::Nothing, Type::Nothing)]) + .input_output_types(vec![ + (Type::Nothing, Type::Nothing), + ( + Type::Nothing, + Type::Table(vec![ + ("operation".into(), Type::String), + ("path".into(), Type::String), + ("new_path".into(), Type::String), + ].into_boxed_slice()) + ), + ]) .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.") - .required("closure", - SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])), - "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.") + .optional( + "closure", + SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])), + "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.", + ) .named( "debounce-ms", SyntaxShape::Int, @@ -95,56 +112,42 @@ impl Command for Watch { let cwd = engine_state.cwd_as_string(Some(stack))?; let path_arg: Spanned = call.req(engine_state, stack, 0)?; - let path_no_whitespace = &path_arg + let path_no_whitespace = path_arg .item .trim_end_matches(|x| matches!(x, '\x09'..='\x0d')); - let path = match nu_path::canonicalize_with(path_no_whitespace, cwd) { - Ok(p) => p, - Err(err) => { - return Err(ShellError::Io(IoError::new( - err, - path_arg.span, - PathBuf::from(path_no_whitespace), - ))); - } - }; - - let closure: Closure = call.req(engine_state, stack, 1)?; + let path = nu_path::canonicalize_with(path_no_whitespace, cwd).map_err(|err| { + ShellError::Io(IoError::new( + err, + path_arg.span, + PathBuf::from(path_no_whitespace), + )) + })?; + let closure: Option = call.opt(engine_state, stack, 1)?; let verbose = call.has_flag(engine_state, stack, "verbose")?; - let quiet = call.has_flag(engine_state, stack, "quiet")?; - - let debounce_duration_flag_ms: Option> = - call.get_flag(engine_state, stack, "debounce-ms")?; - - let debounce_duration_flag: Option> = - call.get_flag(engine_state, stack, "debounce")?; - - let debounce_duration: Duration = - resolve_duration_arguments(debounce_duration_flag_ms, debounce_duration_flag)?; + let debounce_duration: Duration = resolve_duration_arguments( + call.get_flag(engine_state, stack, "debounce-ms")?, + call.get_flag(engine_state, stack, "debounce")?, + )?; let glob_flag: Option> = call.get_flag(engine_state, stack, "glob")?; - let glob_pattern = match glob_flag { - Some(glob) => { + let glob_pattern = glob_flag + .map(|glob| { let absolute_path = path.join(glob.item); if verbose { eprintln!("Absolute glob path: {absolute_path:?}"); } - match nu_glob::Pattern::new(&absolute_path.to_string_lossy()) { - Ok(pattern) => Some(pattern), - Err(_) => { - return Err(ShellError::TypeMismatch { - err_message: "Glob pattern is invalid".to_string(), - span: glob.span, - }); + nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| { + ShellError::TypeMismatch { + err_message: "Glob pattern is invalid".to_string(), + span: glob.span, } - } - } - None => None, - }; + }) + }) + .transpose()?; let recursive_flag: Option> = call.get_flag(engine_state, stack, "recursive")?; @@ -161,22 +164,19 @@ impl Command for Watch { let (tx, rx) = channel(); - let mut debouncer = match new_debouncer(debounce_duration, None, tx) { - Ok(d) => d, - Err(e) => { - return Err(ShellError::GenericError { - error: "Failed to create watcher".to_string(), - msg: e.to_string(), - span: Some(call.head), - help: None, - inner: vec![], - }); - } - }; - if let Err(e) = debouncer.watcher().watch(&path, recursive_mode) { + let mut debouncer = + new_debouncer(debounce_duration, None, tx).map_err(|err| ShellError::GenericError { + error: "Failed to create watcher".to_string(), + msg: err.to_string(), + span: Some(call.head), + help: None, + inner: vec![], + })?; + + if let Err(err) = debouncer.watcher().watch(&path, recursive_mode) { return Err(ShellError::GenericError { error: "Failed to create watcher".to_string(), - msg: e.to_string(), + msg: err.to_string(), span: Some(call.head), help: None, inner: vec![], @@ -189,107 +189,62 @@ impl Command for Watch { eprintln!("Now watching files at {path:?}. Press ctrl+c to abort."); } - let mut closure = ClosureEval::new(engine_state, stack, closure); + let iter = WatchIterator::new(debouncer, rx, engine_state.signals().clone()); - let mut event_handler = move |operation: &str, - path: PathBuf, - new_path: Option| - -> Result<(), ShellError> { - let matches_glob = match &glob_pattern { - Some(glob) => glob.matches_path(&path), - None => true, - }; - if verbose && glob_pattern.is_some() { - eprintln!("Matches glob: {matches_glob}"); - } + if let Some(closure) = closure { + let mut closure = ClosureEval::new(engine_state, stack, closure); - if matches_glob { - let result = closure - .add_arg(Value::string(operation, head)) - .add_arg(Value::string(path.to_string_lossy(), head)) - .add_arg(Value::string( - new_path.unwrap_or_else(|| "".into()).to_string_lossy(), - head, - )) - .run_with_input(PipelineData::empty()); - - match result { - Ok(val) => val.print_table(engine_state, stack, false, false)?, - Err(err) => report_shell_error(engine_state, &err), - }; - } - - Ok(()) - }; - - loop { - match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) { - Ok(Ok(events)) => { - if verbose { - eprintln!("{events:?}"); + for events in iter { + for event in events? { + let matches_glob = match &glob_pattern { + Some(glob) => glob.matches_path(&event.path), + None => true, + }; + if verbose && glob_pattern.is_some() { + eprintln!("Matches glob: {matches_glob}"); } - for mut one_event in events { - let handle_result = match one_event.event.kind { - // only want to handle event if relative path exists. - EventKind::Create(_) => one_event - .paths - .pop() - .map(|path| event_handler("Create", path, None)) - .unwrap_or(Ok(())), - EventKind::Remove(_) => one_event - .paths - .pop() - .map(|path| event_handler("Remove", path, None)) - .unwrap_or(Ok(())), - EventKind::Modify(ModifyKind::Data(DataChange::Content)) - | EventKind::Modify(ModifyKind::Data(DataChange::Any)) - | EventKind::Modify(ModifyKind::Any) => one_event - .paths - .pop() - .map(|path| event_handler("Write", path, None)) - .unwrap_or(Ok(())), - EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => one_event - .paths - .pop() - .map(|to| { - one_event - .paths - .pop() - .map(|from| event_handler("Rename", from, Some(to))) - .unwrap_or(Ok(())) - }) - .unwrap_or(Ok(())), - _ => Ok(()), + + if matches_glob { + let result = closure + .add_arg(event.operation.into_value(head)) + .add_arg(event.path.to_string_lossy().into_value(head)) + .add_arg( + event + .new_path + .as_deref() + .map(Path::to_string_lossy) + .into_value(head), + ) + .run_with_input(PipelineData::empty()); + + match result { + Ok(val) => val.print_table(engine_state, stack, false, false)?, + Err(err) => report_shell_error(engine_state, &err), }; - handle_result?; } } - Ok(Err(_)) => { - return Err(ShellError::GenericError { - error: "Receiving events failed".to_string(), - msg: "Unexpected errors when receiving events".into(), - span: None, - help: None, - inner: vec![], - }); - } - Err(RecvTimeoutError::Disconnected) => { - return Err(ShellError::GenericError { - error: "Disconnected".to_string(), - msg: "Unexpected disconnect from file watcher".into(), - span: None, - help: None, - inner: vec![], - }); - } - Err(RecvTimeoutError::Timeout) => {} } - if engine_state.signals().interrupted() { - break; - } - } - Ok(PipelineData::empty()) + Ok(PipelineData::empty()) + } else { + fn glob_filter(glob: Option<&nu_glob::Pattern>, path: &Path) -> bool { + let Some(glob) = glob else { return true }; + glob.matches_path(path) + } + + let out = iter + .flat_map(|e| match e { + Ok(events) => Either::Right(events.into_iter().map(Ok)), + Err(err) => Either::Left(std::iter::once(Err(err))), + }) + .filter_map(move |e| match e { + Ok(ev) => glob_filter(glob_pattern.as_ref(), &ev.path) + .then(|| WatchEventRecord::from(&ev).into_value(head)), + Err(err) => Some(Value::error(err, head)), + }) + .into_pipeline_data(head, engine_state.signals().clone()); + Ok(out) + } } fn examples(&self) -> Vec { @@ -305,8 +260,13 @@ impl Command for Watch { result: None, }, Example { - description: "Log all changes in a directory", - example: r#"watch /foo/bar { |op, path| $"($op) - ($path)(char nl)" | save --append changes_in_bar.log }"#, + description: "`watch` (when run without a closure) can also emit a stream of events it detects.", + example: r#"watch /foo/bar + | where operation == Create + | first 5 + | each {|e| $"New file!: ($e.path)" } + | to text + | save --append changes_in_bar.log"#, result: None, }, Example { @@ -345,3 +305,130 @@ fn resolve_duration_arguments( (Some(v), None) => Ok(v.item), } } + +struct WatchEvent { + operation: &'static str, + path: PathBuf, + new_path: Option, +} + +#[derive(IntoValue)] +struct WatchEventRecord<'a> { + operation: &'static str, + path: Cow<'a, str>, + new_path: Option>, +} + +impl<'a> From<&'a WatchEvent> for WatchEventRecord<'a> { + fn from(value: &'a WatchEvent) -> Self { + Self { + operation: value.operation, + path: value.path.to_string_lossy(), + new_path: value.new_path.as_deref().map(Path::to_string_lossy), + } + } +} + +impl TryFrom for WatchEvent { + type Error = (); + + fn try_from(mut ev: DebouncedEvent) -> Result { + // TODO: Maybe we should handle all event kinds? + match ev.event.kind { + EventKind::Create(_) => ev.paths.pop().map(|p| WatchEvent { + operation: "Create", + path: p, + new_path: None, + }), + EventKind::Remove(_) => ev.paths.pop().map(|p| WatchEvent { + operation: "Remove", + path: p, + new_path: None, + }), + EventKind::Modify( + ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any, + ) => ev.paths.pop().map(|p| WatchEvent { + operation: "Write", + path: p, + new_path: None, + }), + EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => ev + .paths + .drain(..) + .rev() + .next_array() + .map(|[from, to]| WatchEvent { + operation: "Rename", + path: from, + new_path: Some(to), + }), + _ => None, + } + .ok_or(()) + } +} + +struct WatchIterator { + /// Debouncer needs to be kept alive for `rx` to keep receiving events. + _debouncer: Debouncer, + rx: Option, Vec>>>, + signals: Signals, +} + +impl WatchIterator { + fn new( + debouncer: Debouncer, + rx: Receiver, Vec>>, + signals: Signals, + ) -> Self { + Self { + _debouncer: debouncer, + rx: Some(rx), + signals, + } + } +} + +impl Iterator for WatchIterator { + type Item = Result, ShellError>; + + fn next(&mut self) -> Option { + let rx = self.rx.as_ref()?; + while !self.signals.interrupted() { + let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) { + Ok(x) => x, + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => { + self.rx = None; + return Some(Err(ShellError::GenericError { + error: "Disconnected".to_string(), + msg: "Unexpected disconnect from file watcher".into(), + span: None, + help: None, + inner: vec![], + })); + } + }; + + let Ok(events) = x else { + self.rx = None; + return Some(Err(ShellError::GenericError { + error: "Receiving events failed".to_string(), + msg: "Unexpected errors when receiving events".into(), + span: None, + help: None, + inner: vec![], + })); + }; + + let watch_events = events + .into_iter() + .filter_map(|ev| WatchEvent::try_from(ev).ok()) + .collect::>(); + + return Some(Ok(watch_events)); + } + self.rx = None; + None + } +}