Upgrade to interprocess 2.0.0 (#12729)

# Description

This fixes #12724. NetBSD confirmed to work with this change.

The update also behaves a bit better in some ways - it automatically
unlinks and reclaims sockets on Unix, and doesn't try to flush/sync the
socket on Windows, so I was able to remove that platform-specific logic.

They also have a way to split the socket so I could just use one socket
now, but I haven't tried to do that yet. That would be more of a
breaking change but I think it's more straightforward.

# User-Facing Changes

- Hopefully more platforms work

# Tests + Formatting
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`
This commit is contained in:
Devyn Cairns
2024-05-02 22:31:33 -07:00
committed by GitHub
parent bc6d934fa1
commit 72f3942c37
5 changed files with 79 additions and 260 deletions

View File

@ -1,4 +1,4 @@
use std::ffi::OsString;
use std::ffi::{OsStr, OsString};
#[cfg(test)]
pub(crate) mod tests;
@ -23,6 +23,16 @@ pub fn make_local_socket_name(unique_id: &str) -> OsString {
base.into()
}
/// Interpret a local socket name for use with `interprocess`.
#[cfg(unix)]
pub fn interpret_local_socket_name(
name: &OsStr,
) -> Result<interprocess::local_socket::Name, std::io::Error> {
use interprocess::local_socket::{GenericFilePath, ToFsName};
name.to_fs_name::<GenericFilePath>()
}
/// Generate a name to be used for a local socket specific to this `nu` process, described by the
/// given `unique_id`, which should be unique to the purpose of the socket.
///
@ -33,6 +43,16 @@ pub fn make_local_socket_name(unique_id: &str) -> OsString {
format!("nu.{}.{}", std::process::id(), unique_id).into()
}
/// Interpret a local socket name for use with `interprocess`.
#[cfg(windows)]
pub fn interpret_local_socket_name(
name: &OsStr,
) -> Result<interprocess::local_socket::Name, std::io::Error> {
use interprocess::local_socket::{GenericNamespaced, ToNsName};
name.to_ns_name::<GenericNamespaced>()
}
/// Determine if the error is just due to the listener not being ready yet in asynchronous mode
#[cfg(not(windows))]
pub fn is_would_block_err(err: &std::io::Error) -> bool {
@ -48,37 +68,3 @@ pub fn is_would_block_err(err: &std::io::Error) -> bool {
e as i64 == windows::Win32::Foundation::ERROR_PIPE_LISTENING.0 as i64
})
}
/// Wraps the `interprocess` local socket stream for greater compatibility
#[derive(Debug)]
pub struct LocalSocketStream(pub interprocess::local_socket::LocalSocketStream);
impl From<interprocess::local_socket::LocalSocketStream> for LocalSocketStream {
fn from(value: interprocess::local_socket::LocalSocketStream) -> Self {
LocalSocketStream(value)
}
}
impl std::io::Read for LocalSocketStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}
impl std::io::Write for LocalSocketStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
// We don't actually flush the underlying socket on Windows. The flush operation on a
// Windows named pipe actually synchronizes with read on the other side, and won't finish
// until the other side is empty. This isn't how most of our other I/O methods work, so we
// just won't do it. The BufWriter above this will have still made a write call with the
// contents of the buffer, which should be good enough.
if cfg!(not(windows)) {
self.0.flush()?;
}
Ok(())
}
}

View File

@ -4,9 +4,6 @@ use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use nu_protocol::ShellError;
#[cfg(feature = "local-socket")]
use interprocess::local_socket::LocalSocketListener;
#[cfg(feature = "local-socket")]
mod local_socket;
@ -83,15 +80,14 @@ impl CommunicationMode {
// For sockets: we need to create the server so that the child won't fail to connect.
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(name) => {
let listener = LocalSocketListener::bind(name.as_os_str()).map_err(|err| {
ShellError::IOError {
use interprocess::local_socket::ListenerOptions;
let listener = interpret_local_socket_name(name)
.and_then(|name| ListenerOptions::new().name(name).create_sync())
.map_err(|err| ShellError::IOError {
msg: format!("failed to open socket for plugin: {err}"),
}
})?;
Ok(PreparedServerCommunication::LocalSocket {
name: name.clone(),
listener,
})
})?;
Ok(PreparedServerCommunication::LocalSocket { listener })
}
}
}
@ -107,11 +103,13 @@ impl CommunicationMode {
// Connect to the specified socket.
let get_socket = || {
use interprocess::local_socket as ls;
ls::LocalSocketStream::connect(name.as_os_str())
use ls::traits::Stream;
interpret_local_socket_name(name)
.and_then(|name| ls::Stream::connect(name))
.map_err(|err| ShellError::IOError {
msg: format!("failed to connect to socket: {err}"),
})
.map(LocalSocketStream::from)
};
// Reverse order from the server: read in, write out
let read_in = get_socket()?;
@ -133,9 +131,7 @@ pub enum PreparedServerCommunication {
/// Contains the listener to accept connections on. On Unix, the socket is unlinked on `Drop`.
#[cfg(feature = "local-socket")]
LocalSocket {
#[cfg_attr(windows, allow(dead_code))] // not used on Windows
name: std::ffi::OsString,
listener: LocalSocketListener,
listener: interprocess::local_socket::Listener,
},
}
@ -161,6 +157,9 @@ impl PreparedServerCommunication {
}
#[cfg(feature = "local-socket")]
PreparedServerCommunication::LocalSocket { listener, .. } => {
use interprocess::local_socket::traits::{
Listener, ListenerNonblockingMode, Stream,
};
use std::time::{Duration, Instant};
const RETRY_PERIOD: Duration = Duration::from_millis(1);
@ -170,13 +169,16 @@ impl PreparedServerCommunication {
// Use a loop to try to get two clients from the listener: one for read (the plugin
// output) and one for write (the plugin input)
listener.set_nonblocking(true)?;
//
// Be non-blocking on Accept only, so we can timeout.
listener.set_nonblocking(ListenerNonblockingMode::Accept)?;
let mut get_socket = || {
let mut result = None;
while let Ok(None) = child.try_wait() {
match listener.accept() {
Ok(stream) => {
// Success! But make sure the stream is in blocking mode.
// Success! Ensure the stream is in nonblocking mode though, for
// good measure. Had an issue without this on macOS.
stream.set_nonblocking(false)?;
result = Some(stream);
break;
@ -198,7 +200,7 @@ impl PreparedServerCommunication {
}
}
if let Some(stream) = result {
Ok(LocalSocketStream(stream))
Ok(stream)
} else {
// The process may have exited
Err(ShellError::PluginFailedToLoad {
@ -215,26 +217,13 @@ impl PreparedServerCommunication {
}
}
impl Drop for PreparedServerCommunication {
fn drop(&mut self) {
match self {
#[cfg(all(unix, feature = "local-socket"))]
PreparedServerCommunication::LocalSocket { name: path, .. } => {
// Just try to remove the socket file, it's ok if this fails
let _ = std::fs::remove_file(path);
}
_ => (),
}
}
}
/// The required streams for communication from the engine side, i.e. the server in socket terms.
pub enum ServerCommunicationIo {
Stdio(ChildStdin, ChildStdout),
#[cfg(feature = "local-socket")]
LocalSocket {
read_out: LocalSocketStream,
write_in: LocalSocketStream,
read_out: interprocess::local_socket::Stream,
write_in: interprocess::local_socket::Stream,
},
}
@ -243,7 +232,7 @@ pub enum ClientCommunicationIo {
Stdio(Stdin, Stdout),
#[cfg(feature = "local-socket")]
LocalSocket {
read_in: LocalSocketStream,
write_out: LocalSocketStream,
read_in: interprocess::local_socket::Stream,
write_out: interprocess::local_socket::Stream,
},
}

View File

@ -1,9 +1,12 @@
use std::{
error::Error,
ffi::OsStr,
io::{BufRead, BufReader, Write},
};
use interprocess::local_socket::LocalSocketStream;
use interprocess::local_socket::{
self, traits::Stream, GenericFilePath, GenericNamespaced, ToFsName, ToNsName,
};
use serde::Deserialize;
use serde_json::{json, Value};
@ -35,9 +38,6 @@ pub fn main() -> Result<(), Box<dyn Error>> {
local_socket_path: None,
};
#[allow(unused_mut)]
let mut should_flush = true;
let (mut input, mut output): (Box<dyn BufRead>, Box<dyn Write>) =
match args.get(1).map(|s| s.as_str()) {
Some("--stdio") => (
@ -49,14 +49,13 @@ pub fn main() -> Result<(), Box<dyn Error>> {
if opts.refuse_local_socket {
std::process::exit(1)
} else {
let in_socket = LocalSocketStream::connect(args[2].as_str())?;
let out_socket = LocalSocketStream::connect(args[2].as_str())?;
#[cfg(windows)]
{
// Flushing on a socket on Windows is weird and waits for the other side
should_flush = false;
}
let name = if cfg!(windows) {
OsStr::new(&args[2]).to_ns_name::<GenericNamespaced>()?
} else {
OsStr::new(&args[2]).to_fs_name::<GenericFilePath>()?
};
let in_socket = local_socket::Stream::connect(name.clone())?;
let out_socket = local_socket::Stream::connect(name)?;
(Box::new(BufReader::new(in_socket)), Box::new(out_socket))
}
@ -73,9 +72,7 @@ pub fn main() -> Result<(), Box<dyn Error>> {
// Send encoding format
output.write_all(b"\x04json")?;
if should_flush {
output.flush()?;
}
output.flush()?;
// Test exiting without `Hello`
if opts.exit_before_hello {
@ -91,7 +88,6 @@ pub fn main() -> Result<(), Box<dyn Error>> {
// Send `Hello` message
write(
&mut output,
should_flush,
&json!({
"Hello": {
"protocol": "nu-plugin",
@ -117,7 +113,7 @@ pub fn main() -> Result<(), Box<dyn Error>> {
// Parse incoming messages
loop {
match Value::deserialize(&mut de) {
Ok(message) => handle_message(&mut output, should_flush, &opts, &message)?,
Ok(message) => handle_message(&mut output, &opts, &message)?,
Err(err) => {
if err.is_eof() {
break;
@ -135,7 +131,6 @@ pub fn main() -> Result<(), Box<dyn Error>> {
fn handle_message(
output: &mut impl Write,
should_flush: bool,
opts: &Options,
message: &Value,
) -> Result<(), Box<dyn Error>> {
@ -144,7 +139,6 @@ fn handle_message(
if plugin_call.as_str() == Some("Signature") {
write(
output,
should_flush,
&json!({
"CallResponse": [
id,
@ -165,7 +159,6 @@ fn handle_message(
});
write(
output,
should_flush,
&json!({
"CallResponse": [
id,
@ -212,11 +205,9 @@ fn signatures() -> Vec<Value> {
})]
}
fn write(output: &mut impl Write, should_flush: bool, value: &Value) -> Result<(), Box<dyn Error>> {
fn write(output: &mut impl Write, value: &Value) -> Result<(), Box<dyn Error>> {
serde_json::to_writer(&mut *output, value)?;
output.write_all(b"\n")?;
if should_flush {
output.flush()?;
}
output.flush()?;
Ok(())
}