mirror of
https://github.com/nushell/nushell.git
synced 2025-08-24 09:45:58 +02:00
feat: watch
returns a stream of events as a table when called without a closure (#16428)
`watch` command can now be used to _return a stream_ of detected events instead of calling a closure with it's information, though using a closure is still possible and existing uses won't break. In addition to this: ```nushell watch . {|operation, path, new_path| if $operation == "Write" and $path like "*.md" { md-lint $path } } ``` Now this is also possible: ```nushell watch . | where operation == Write and path like "*.md" | each { md-lint $in.path } ```
This commit is contained in:
@@ -1,19 +1,21 @@
|
||||
use itertools::{Either, Itertools};
|
||||
use notify_debouncer_full::{
|
||||
new_debouncer,
|
||||
DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
|
||||
notify::{
|
||||
EventKind, RecursiveMode, Watcher,
|
||||
self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
|
||||
event::{DataChange, ModifyKind, RenameMode},
|
||||
},
|
||||
};
|
||||
use nu_engine::{ClosureEval, command_prelude::*};
|
||||
use nu_protocol::{
|
||||
DeprecationEntry, DeprecationType, ReportMode, engine::Closure, report_shell_error,
|
||||
DeprecationEntry, DeprecationType, ReportMode, Signals, engine::Closure, report_shell_error,
|
||||
shell_error::io::IoError,
|
||||
};
|
||||
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::mpsc::{RecvTimeoutError, channel},
|
||||
borrow::Cow,
|
||||
path::{Path, PathBuf},
|
||||
sync::mpsc::{Receiver, RecvTimeoutError, channel},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -33,6 +35,10 @@ impl Command for Watch {
|
||||
"Watch for file changes and execute Nu code when they happen."
|
||||
}
|
||||
|
||||
fn extra_description(&self) -> &str {
|
||||
"When run without a closure, `watch` returns a stream of events instead."
|
||||
}
|
||||
|
||||
fn search_terms(&self) -> Vec<&str> {
|
||||
vec!["watcher", "reload", "filesystem"]
|
||||
}
|
||||
@@ -49,12 +55,23 @@ impl Command for Watch {
|
||||
|
||||
fn signature(&self) -> nu_protocol::Signature {
|
||||
Signature::build("watch")
|
||||
// actually `watch` never returns normally, but we don't have `noreturn` / `never` type yet
|
||||
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
|
||||
.input_output_types(vec![
|
||||
(Type::Nothing, Type::Nothing),
|
||||
(
|
||||
Type::Nothing,
|
||||
Type::Table(vec![
|
||||
("operation".into(), Type::String),
|
||||
("path".into(), Type::String),
|
||||
("new_path".into(), Type::String),
|
||||
].into_boxed_slice())
|
||||
),
|
||||
])
|
||||
.required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
|
||||
.required("closure",
|
||||
SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
|
||||
"Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.")
|
||||
.optional(
|
||||
"closure",
|
||||
SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
|
||||
"Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.",
|
||||
)
|
||||
.named(
|
||||
"debounce-ms",
|
||||
SyntaxShape::Int,
|
||||
@@ -95,56 +112,42 @@ impl Command for Watch {
|
||||
let cwd = engine_state.cwd_as_string(Some(stack))?;
|
||||
let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
|
||||
|
||||
let path_no_whitespace = &path_arg
|
||||
let path_no_whitespace = path_arg
|
||||
.item
|
||||
.trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
|
||||
|
||||
let path = match nu_path::canonicalize_with(path_no_whitespace, cwd) {
|
||||
Ok(p) => p,
|
||||
Err(err) => {
|
||||
return Err(ShellError::Io(IoError::new(
|
||||
err,
|
||||
path_arg.span,
|
||||
PathBuf::from(path_no_whitespace),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let closure: Closure = call.req(engine_state, stack, 1)?;
|
||||
let path = nu_path::canonicalize_with(path_no_whitespace, cwd).map_err(|err| {
|
||||
ShellError::Io(IoError::new(
|
||||
err,
|
||||
path_arg.span,
|
||||
PathBuf::from(path_no_whitespace),
|
||||
))
|
||||
})?;
|
||||
|
||||
let closure: Option<Closure> = call.opt(engine_state, stack, 1)?;
|
||||
let verbose = call.has_flag(engine_state, stack, "verbose")?;
|
||||
|
||||
let quiet = call.has_flag(engine_state, stack, "quiet")?;
|
||||
|
||||
let debounce_duration_flag_ms: Option<Spanned<i64>> =
|
||||
call.get_flag(engine_state, stack, "debounce-ms")?;
|
||||
|
||||
let debounce_duration_flag: Option<Spanned<Duration>> =
|
||||
call.get_flag(engine_state, stack, "debounce")?;
|
||||
|
||||
let debounce_duration: Duration =
|
||||
resolve_duration_arguments(debounce_duration_flag_ms, debounce_duration_flag)?;
|
||||
let debounce_duration: Duration = resolve_duration_arguments(
|
||||
call.get_flag(engine_state, stack, "debounce-ms")?,
|
||||
call.get_flag(engine_state, stack, "debounce")?,
|
||||
)?;
|
||||
|
||||
let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
|
||||
let glob_pattern = match glob_flag {
|
||||
Some(glob) => {
|
||||
let glob_pattern = glob_flag
|
||||
.map(|glob| {
|
||||
let absolute_path = path.join(glob.item);
|
||||
if verbose {
|
||||
eprintln!("Absolute glob path: {absolute_path:?}");
|
||||
}
|
||||
|
||||
match nu_glob::Pattern::new(&absolute_path.to_string_lossy()) {
|
||||
Ok(pattern) => Some(pattern),
|
||||
Err(_) => {
|
||||
return Err(ShellError::TypeMismatch {
|
||||
err_message: "Glob pattern is invalid".to_string(),
|
||||
span: glob.span,
|
||||
});
|
||||
nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
|
||||
ShellError::TypeMismatch {
|
||||
err_message: "Glob pattern is invalid".to_string(),
|
||||
span: glob.span,
|
||||
}
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let recursive_flag: Option<Spanned<bool>> =
|
||||
call.get_flag(engine_state, stack, "recursive")?;
|
||||
@@ -161,22 +164,19 @@ impl Command for Watch {
|
||||
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let mut debouncer = match new_debouncer(debounce_duration, None, tx) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
return Err(ShellError::GenericError {
|
||||
error: "Failed to create watcher".to_string(),
|
||||
msg: e.to_string(),
|
||||
span: Some(call.head),
|
||||
help: None,
|
||||
inner: vec![],
|
||||
});
|
||||
}
|
||||
};
|
||||
if let Err(e) = debouncer.watcher().watch(&path, recursive_mode) {
|
||||
let mut debouncer =
|
||||
new_debouncer(debounce_duration, None, tx).map_err(|err| ShellError::GenericError {
|
||||
error: "Failed to create watcher".to_string(),
|
||||
msg: err.to_string(),
|
||||
span: Some(call.head),
|
||||
help: None,
|
||||
inner: vec![],
|
||||
})?;
|
||||
|
||||
if let Err(err) = debouncer.watcher().watch(&path, recursive_mode) {
|
||||
return Err(ShellError::GenericError {
|
||||
error: "Failed to create watcher".to_string(),
|
||||
msg: e.to_string(),
|
||||
msg: err.to_string(),
|
||||
span: Some(call.head),
|
||||
help: None,
|
||||
inner: vec![],
|
||||
@@ -189,107 +189,62 @@ impl Command for Watch {
|
||||
eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
|
||||
}
|
||||
|
||||
let mut closure = ClosureEval::new(engine_state, stack, closure);
|
||||
let iter = WatchIterator::new(debouncer, rx, engine_state.signals().clone());
|
||||
|
||||
let mut event_handler = move |operation: &str,
|
||||
path: PathBuf,
|
||||
new_path: Option<PathBuf>|
|
||||
-> Result<(), ShellError> {
|
||||
let matches_glob = match &glob_pattern {
|
||||
Some(glob) => glob.matches_path(&path),
|
||||
None => true,
|
||||
};
|
||||
if verbose && glob_pattern.is_some() {
|
||||
eprintln!("Matches glob: {matches_glob}");
|
||||
}
|
||||
if let Some(closure) = closure {
|
||||
let mut closure = ClosureEval::new(engine_state, stack, closure);
|
||||
|
||||
if matches_glob {
|
||||
let result = closure
|
||||
.add_arg(Value::string(operation, head))
|
||||
.add_arg(Value::string(path.to_string_lossy(), head))
|
||||
.add_arg(Value::string(
|
||||
new_path.unwrap_or_else(|| "".into()).to_string_lossy(),
|
||||
head,
|
||||
))
|
||||
.run_with_input(PipelineData::empty());
|
||||
|
||||
match result {
|
||||
Ok(val) => val.print_table(engine_state, stack, false, false)?,
|
||||
Err(err) => report_shell_error(engine_state, &err),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
loop {
|
||||
match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
|
||||
Ok(Ok(events)) => {
|
||||
if verbose {
|
||||
eprintln!("{events:?}");
|
||||
for events in iter {
|
||||
for event in events? {
|
||||
let matches_glob = match &glob_pattern {
|
||||
Some(glob) => glob.matches_path(&event.path),
|
||||
None => true,
|
||||
};
|
||||
if verbose && glob_pattern.is_some() {
|
||||
eprintln!("Matches glob: {matches_glob}");
|
||||
}
|
||||
for mut one_event in events {
|
||||
let handle_result = match one_event.event.kind {
|
||||
// only want to handle event if relative path exists.
|
||||
EventKind::Create(_) => one_event
|
||||
.paths
|
||||
.pop()
|
||||
.map(|path| event_handler("Create", path, None))
|
||||
.unwrap_or(Ok(())),
|
||||
EventKind::Remove(_) => one_event
|
||||
.paths
|
||||
.pop()
|
||||
.map(|path| event_handler("Remove", path, None))
|
||||
.unwrap_or(Ok(())),
|
||||
EventKind::Modify(ModifyKind::Data(DataChange::Content))
|
||||
| EventKind::Modify(ModifyKind::Data(DataChange::Any))
|
||||
| EventKind::Modify(ModifyKind::Any) => one_event
|
||||
.paths
|
||||
.pop()
|
||||
.map(|path| event_handler("Write", path, None))
|
||||
.unwrap_or(Ok(())),
|
||||
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => one_event
|
||||
.paths
|
||||
.pop()
|
||||
.map(|to| {
|
||||
one_event
|
||||
.paths
|
||||
.pop()
|
||||
.map(|from| event_handler("Rename", from, Some(to)))
|
||||
.unwrap_or(Ok(()))
|
||||
})
|
||||
.unwrap_or(Ok(())),
|
||||
_ => Ok(()),
|
||||
|
||||
if matches_glob {
|
||||
let result = closure
|
||||
.add_arg(event.operation.into_value(head))
|
||||
.add_arg(event.path.to_string_lossy().into_value(head))
|
||||
.add_arg(
|
||||
event
|
||||
.new_path
|
||||
.as_deref()
|
||||
.map(Path::to_string_lossy)
|
||||
.into_value(head),
|
||||
)
|
||||
.run_with_input(PipelineData::empty());
|
||||
|
||||
match result {
|
||||
Ok(val) => val.print_table(engine_state, stack, false, false)?,
|
||||
Err(err) => report_shell_error(engine_state, &err),
|
||||
};
|
||||
handle_result?;
|
||||
}
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
return Err(ShellError::GenericError {
|
||||
error: "Receiving events failed".to_string(),
|
||||
msg: "Unexpected errors when receiving events".into(),
|
||||
span: None,
|
||||
help: None,
|
||||
inner: vec![],
|
||||
});
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
return Err(ShellError::GenericError {
|
||||
error: "Disconnected".to_string(),
|
||||
msg: "Unexpected disconnect from file watcher".into(),
|
||||
span: None,
|
||||
help: None,
|
||||
inner: vec![],
|
||||
});
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {}
|
||||
}
|
||||
if engine_state.signals().interrupted() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(PipelineData::empty())
|
||||
Ok(PipelineData::empty())
|
||||
} else {
|
||||
fn glob_filter(glob: Option<&nu_glob::Pattern>, path: &Path) -> bool {
|
||||
let Some(glob) = glob else { return true };
|
||||
glob.matches_path(path)
|
||||
}
|
||||
|
||||
let out = iter
|
||||
.flat_map(|e| match e {
|
||||
Ok(events) => Either::Right(events.into_iter().map(Ok)),
|
||||
Err(err) => Either::Left(std::iter::once(Err(err))),
|
||||
})
|
||||
.filter_map(move |e| match e {
|
||||
Ok(ev) => glob_filter(glob_pattern.as_ref(), &ev.path)
|
||||
.then(|| WatchEventRecord::from(&ev).into_value(head)),
|
||||
Err(err) => Some(Value::error(err, head)),
|
||||
})
|
||||
.into_pipeline_data(head, engine_state.signals().clone());
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
fn examples(&self) -> Vec<Example> {
|
||||
@@ -305,8 +260,13 @@ impl Command for Watch {
|
||||
result: None,
|
||||
},
|
||||
Example {
|
||||
description: "Log all changes in a directory",
|
||||
example: r#"watch /foo/bar { |op, path| $"($op) - ($path)(char nl)" | save --append changes_in_bar.log }"#,
|
||||
description: "`watch` (when run without a closure) can also emit a stream of events it detects.",
|
||||
example: r#"watch /foo/bar
|
||||
| where operation == Create
|
||||
| first 5
|
||||
| each {|e| $"New file!: ($e.path)" }
|
||||
| to text
|
||||
| save --append changes_in_bar.log"#,
|
||||
result: None,
|
||||
},
|
||||
Example {
|
||||
@@ -345,3 +305,130 @@ fn resolve_duration_arguments(
|
||||
(Some(v), None) => Ok(v.item),
|
||||
}
|
||||
}
|
||||
|
||||
struct WatchEvent {
|
||||
operation: &'static str,
|
||||
path: PathBuf,
|
||||
new_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(IntoValue)]
|
||||
struct WatchEventRecord<'a> {
|
||||
operation: &'static str,
|
||||
path: Cow<'a, str>,
|
||||
new_path: Option<Cow<'a, str>>,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a WatchEvent> for WatchEventRecord<'a> {
|
||||
fn from(value: &'a WatchEvent) -> Self {
|
||||
Self {
|
||||
operation: value.operation,
|
||||
path: value.path.to_string_lossy(),
|
||||
new_path: value.new_path.as_deref().map(Path::to_string_lossy),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DebouncedEvent> for WatchEvent {
|
||||
type Error = ();
|
||||
|
||||
fn try_from(mut ev: DebouncedEvent) -> Result<Self, Self::Error> {
|
||||
// TODO: Maybe we should handle all event kinds?
|
||||
match ev.event.kind {
|
||||
EventKind::Create(_) => ev.paths.pop().map(|p| WatchEvent {
|
||||
operation: "Create",
|
||||
path: p,
|
||||
new_path: None,
|
||||
}),
|
||||
EventKind::Remove(_) => ev.paths.pop().map(|p| WatchEvent {
|
||||
operation: "Remove",
|
||||
path: p,
|
||||
new_path: None,
|
||||
}),
|
||||
EventKind::Modify(
|
||||
ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
|
||||
) => ev.paths.pop().map(|p| WatchEvent {
|
||||
operation: "Write",
|
||||
path: p,
|
||||
new_path: None,
|
||||
}),
|
||||
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => ev
|
||||
.paths
|
||||
.drain(..)
|
||||
.rev()
|
||||
.next_array()
|
||||
.map(|[from, to]| WatchEvent {
|
||||
operation: "Rename",
|
||||
path: from,
|
||||
new_path: Some(to),
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
.ok_or(())
|
||||
}
|
||||
}
|
||||
|
||||
struct WatchIterator {
|
||||
/// Debouncer needs to be kept alive for `rx` to keep receiving events.
|
||||
_debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
|
||||
rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
|
||||
signals: Signals,
|
||||
}
|
||||
|
||||
impl WatchIterator {
|
||||
fn new(
|
||||
debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
|
||||
rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
|
||||
signals: Signals,
|
||||
) -> Self {
|
||||
Self {
|
||||
_debouncer: debouncer,
|
||||
rx: Some(rx),
|
||||
signals,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for WatchIterator {
|
||||
type Item = Result<Vec<WatchEvent>, ShellError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let rx = self.rx.as_ref()?;
|
||||
while !self.signals.interrupted() {
|
||||
let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
|
||||
Ok(x) => x,
|
||||
Err(RecvTimeoutError::Timeout) => continue,
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
self.rx = None;
|
||||
return Some(Err(ShellError::GenericError {
|
||||
error: "Disconnected".to_string(),
|
||||
msg: "Unexpected disconnect from file watcher".into(),
|
||||
span: None,
|
||||
help: None,
|
||||
inner: vec![],
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(events) = x else {
|
||||
self.rx = None;
|
||||
return Some(Err(ShellError::GenericError {
|
||||
error: "Receiving events failed".to_string(),
|
||||
msg: "Unexpected errors when receiving events".into(),
|
||||
span: None,
|
||||
help: None,
|
||||
inner: vec![],
|
||||
}));
|
||||
};
|
||||
|
||||
let watch_events = events
|
||||
.into_iter()
|
||||
.filter_map(|ev| WatchEvent::try_from(ev).ok())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
return Some(Ok(watch_events));
|
||||
}
|
||||
self.rx = None;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user