Plugin StreamReader: fuse the iterator after an error (#12027)

# Description

This patches `StreamReader`'s iterator implementation to not return any
values after an I/O error has been encountered.

Without this, it's possible for a protocol error to cause the channel to
disconnect, in which case every call to `recv()` returns an error, which
causes the iterator to produce error values infinitely. There are some
commands that don't immediately stop after receiving an error so it's
possible that they just get stuck in an infinite error. This fixes that
so the error is only produced once, and then the stream ends
artificially.
This commit is contained in:
Devyn Cairns 2024-02-29 14:39:17 -08:00 committed by GitHub
parent 65e5abaa3e
commit 4c4609d646
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 2 deletions

View File

@ -113,8 +113,14 @@ where
fn next(&mut self) -> Option<T> {
// Converting the error to the value here makes the implementation a lot easier
self.recv()
.unwrap_or_else(|err| Some(T::from_shell_error(err)))
match self.recv() {
Ok(option) => option,
Err(err) => {
// Drop the receiver so we don't keep returning errors
self.receiver = None;
Some(T::from_shell_error(err))
}
}
}
}

View File

@ -147,6 +147,21 @@ fn reader_recv_end_of_stream() -> Result<(), ShellError> {
Ok(())
}
#[test]
fn reader_iter_fuse_on_error() -> Result<(), ShellError> {
let (tx, rx) = mpsc::channel();
let mut reader = StreamReader::<Value, _>::new(0, rx, TestSink::default());
drop(tx); // should cause error, because we didn't explicitly signal the end
assert!(
reader.next().is_some_and(|e| e.is_error()),
"should be error the first time"
);
assert!(reader.next().is_none(), "should be closed the second time");
Ok(())
}
#[test]
fn reader_drop() {
let (_tx, rx) = mpsc::channel();