feat: make ctrlc available to plugins (#13181)

# Description

This PR adds a new method to `EngineInterface`: `register_ctrlc_handler`
which takes a closure to run when the plugin's driving engine receives a
ctrlc-signal. It also adds a mirror of the `signals` attribute from the
main shell `EngineState`.

This is an example of how a plugin which makes a long poll http request
can end the request on ctrlc:
https://github.com/cablehead/nu_plugin_http/blob/main/src/commands/request.rs#L68-L77

To facilitate the feature, a new attribute has been added to
`EngineState`: `ctrlc_handlers`. This is a Vec of closures that will be
run when the engine's process receives a ctrlc signal.

When plugins are added to an `engine_state` during a `merge_delta`, the
engine passes the ctrlc_handlers to the plugin's
`.configure_ctrlc_handler` method, which gives the plugin a chance to
register a handler that sends a ctrlc packet through the
`PluginInterface`, if an instance of the plugin is currently running.

On the plugin side: `EngineInterface` also has a ctrlc_handlers Vec of
closures. Plugin calls can use `register_ctrlc_handler` to register a
closure that will be called in the plugin process when the
PluginInput::Ctrlc command is received.

For future reference these are some alternate places that were
investigated for tying the ctrlc trigger to transmitting a Ctrlc packet
through the `PluginInterface`:

- Directly from `src/signals.rs`: the handler there would need a
reference to the Vec<Arc<RegisteredPlugins>>, which would require us to
wrap the plugins in a Mutex, which we don't want to do.

- have `PersistentPlugin.get_plugin` pass down the engine's
CtrlcHandlers to .get and then to .spawn (if the plugin isn't already
running). Once we have CtrlcHandlers in spawn, we can register a handler
to write directly to PluginInterface. We don't want to double down on
passing engine_state to spawn this way though, as it's unpredictable
because it would depend on whether the plugin has already been spawned
or not.

- pass `ctrlc_handlers` to PersistentPlugin::new so it can store it on
itself so it's available to spawn.

- in `PersistentPlugin.spawn`, create a handler that sends to a clone of
the GC event loop's tx. this has the same issues with regards to how to
get CtrlcHandlers to the spawn method, and is more complicated than a
handler that writes directly to PluginInterface

# User-Facing Changes

No breaking changes

---------

Co-authored-by: Ian Manske <ian.manske@pm.me>
This commit is contained in:
Andy Gayton
2024-07-30 09:29:18 -04:00
committed by GitHub
parent e3f78b8793
commit 7b82c6b482
17 changed files with 310 additions and 21 deletions

View File

@ -1,7 +1,10 @@
//! Implements the stream multiplexing interface for both the plugin side and the engine side.
use nu_plugin_protocol::{ByteStreamInfo, ListStreamInfo, PipelineDataHeader, StreamMessage};
use nu_protocol::{ByteStream, IntoSpanned, ListStream, PipelineData, Reader, ShellError, Signals};
use nu_protocol::{
engine::Sequence, ByteStream, IntoSpanned, ListStream, PipelineData, Reader, ShellError,
Signals,
};
use std::{
io::{Read, Write},
sync::Mutex,
@ -10,7 +13,7 @@ use std::{
pub mod stream;
use crate::{util::Sequence, Encoder};
use crate::Encoder;
use self::stream::{StreamManager, StreamManagerHandle, StreamWriter, WriteStreamMessage};

View File

@ -1,5 +1,3 @@
use crate::util::Sequence;
use super::{
stream::{StreamManager, StreamManagerHandle},
test_util::TestCase,
@ -10,8 +8,8 @@ use nu_plugin_protocol::{
StreamMessage,
};
use nu_protocol::{
ByteStream, ByteStreamSource, ByteStreamType, DataSource, ListStream, PipelineData,
PipelineMetadata, ShellError, Signals, Span, Value,
engine::Sequence, ByteStream, ByteStreamSource, ByteStreamType, DataSource, ListStream,
PipelineData, PipelineMetadata, ShellError, Signals, Span, Value,
};
use std::{path::Path, sync::Arc};

View File

@ -1,7 +1,5 @@
mod sequence;
mod waitable;
mod with_custom_values_in;
pub use sequence::Sequence;
pub use waitable::*;
pub use with_custom_values_in::with_custom_values_in;

View File

@ -1,64 +0,0 @@
use nu_protocol::ShellError;
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
/// Implements an atomically incrementing sequential series of numbers
#[derive(Debug, Default)]
pub struct Sequence(AtomicUsize);
impl Sequence {
/// Return the next available id from a sequence, returning an error on overflow
#[track_caller]
pub fn next(&self) -> Result<usize, ShellError> {
// It's totally safe to use Relaxed ordering here, as there aren't other memory operations
// that depend on this value having been set for safety
//
// We're only not using `fetch_add` so that we can check for overflow, as wrapping with the
// identifier would lead to a serious bug - however unlikely that is.
self.0
.fetch_update(Relaxed, Relaxed, |current| current.checked_add(1))
.map_err(|_| ShellError::NushellFailedHelp {
msg: "an accumulator for identifiers overflowed".into(),
help: format!("see {}", std::panic::Location::caller()),
})
}
}
#[test]
fn output_is_sequential() {
let sequence = Sequence::default();
for (expected, generated) in (0..1000).zip(std::iter::repeat_with(|| sequence.next())) {
assert_eq!(expected, generated.expect("error in sequence"));
}
}
#[test]
fn output_is_unique_even_under_contention() {
let sequence = Sequence::default();
std::thread::scope(|scope| {
// Spawn four threads, all advancing the sequence simultaneously
let threads = (0..4)
.map(|_| {
scope.spawn(|| {
(0..100000)
.map(|_| sequence.next())
.collect::<Result<Vec<_>, _>>()
})
})
.collect::<Vec<_>>();
// Collect all of the results into a single flat vec
let mut results = threads
.into_iter()
.flat_map(|thread| thread.join().expect("panicked").expect("error"))
.collect::<Vec<usize>>();
// Check uniqueness
results.sort();
let initial_length = results.len();
results.dedup();
let deduplicated_length = results.len();
assert_eq!(initial_length, deduplicated_length);
})
}