Flush on every plugin Data message (#12728)

# Description

This helps to ensure data produced on a stream is immediately available
to the consumer of the stream. The BufWriter introduced for performance
reasons in 0.93 exposed the behavior that data messages wouldn't make it
to the other side until they filled the buffer in @cablehead's
[`nu_plugin_from_sse`](https://github.com/cablehead/nu_plugin_from_sse).

I had originally not flushed on every `Data` message because I figured
that it isn't really critical that the other side sees those messages
immediately, since they're not used for control and they are flushed
when waiting for acknowledgement or when the buffer is too full anyway.

Increasing the amount of data that can be sent with a single underlying
write increases performance, but this interferes with some plugins that
want to use streams in a more real-time way. In the future I would like
to make this configurable, maybe even per-command, so that a command can
decide what the priority is. But for now I think this is reasonable.

In the worst case, this decreases performance by about 40%, when sending
very small values (just numbers). But for larger values, this PR
actually increases performance by about 20%, because I've increased the
buffer size about 2x to 16,384 bytes. The previous value of 8,192 bytes
was too small to fit a full buffer coming from an external command, so
doubling it makes sense, and now a write of a buffer from an external
command can be done in exactly one write call, which I think makes
sense. I'm doing this at the same time because flushing each data
message would make it very likely that each individual data message from
an external stream would require exactly two writes rather than
approximately one (amortized).

Again, hopefully the tradeoff isn't too bad, and if it is I'll just make
it configurable.

# User-Facing Changes

- Performance of plugin streams will be a bit different
- Plugins that expect to send streams in real-time will work again

# Tests + Formatting

- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`
This commit is contained in:
Devyn Cairns 2024-05-02 16:51:16 -07:00 committed by GitHub
parent be6137d136
commit ad6deadf24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 4 deletions

View File

@ -202,10 +202,13 @@ where
if !self.ended {
self.writer
.write_stream_message(StreamMessage::Data(self.id, data.into()))?;
// Flush after each data message to ensure they do predictably appear on the other side
// when they're generated
//
// TODO: make the buffering configurable, as this is a factor for performance
self.writer.flush()?;
// This implements flow control, so we don't write too many messages:
if !self.signal.notify_sent()? {
// Flush the output, and then wait for acknowledgements
self.writer.flush()?;
self.signal.wait_for_drain()
} else {
Ok(())

View File

@ -24,7 +24,10 @@ use crate::{
PluginSource,
};
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 8192;
/// This should be larger than the largest commonly sent message to avoid excessive fragmentation.
///
/// The buffers coming from external streams are typically each 8192 bytes, so double that.
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
/// Spawn the command for a plugin, in the given `mode`. After spawning, it can be passed to
/// [`make_plugin_interface()`] to get a [`PluginInterface`].

View File

@ -28,8 +28,11 @@ mod interface;
pub use command::{create_plugin_signature, PluginCommand, SimplePluginCommand};
pub use interface::{EngineInterface, EngineInterfaceManager};
/// This should be larger than the largest commonly sent message to avoid excessive fragmentation.
///
/// The buffers coming from external streams are typically each 8192 bytes, so double that.
#[allow(dead_code)]
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 8192;
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
/// The API for a Nushell plugin
///