job recv with a timeout now respects ctrl-c

This commit is contained in:
cosineblast 2025-03-22 23:25:27 -03:00
parent 4f1cca8501
commit f73603a44e

View File

@ -1,7 +1,11 @@
use std::{sync::mpsc::RecvTimeoutError, time::Duration};
use std::{sync::mpsc::RecvTimeoutError, time::Duration, time::Instant};
use nu_engine::command_prelude::*;
use nu_protocol::engine::FilterTag;
use nu_protocol::{
engine::{FilterTag, Mailbox},
Signals,
};
#[derive(Clone)]
pub struct JobRecv;
@ -80,33 +84,12 @@ in no particular order, regardless of the specified timeout parameter.
.lock()
.expect("failed to acquire lock");
if let Some(timeout) = timeout {
let value = mailbox
.recv_timeout(tag, timeout)
.map_err(|error| match error {
RecvTimeoutError::Timeout => ShellError::RecvTimeout { span: head },
// if the channel was disconnected, it means this job was removed from the job
// table, so it was killed/interrupted
RecvTimeoutError::Disconnected => ShellError::Interrupted { span: head },
})?;
Ok(value.into_pipeline_data())
return if let Some(timeout) = timeout {
recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
} else {
loop {
if engine_state.signals().interrupted() {
return Err(ShellError::Interrupted { span: head });
}
match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
Ok(value) => return Ok(value.into_pipeline_data()),
Err(RecvTimeoutError::Timeout) => {} // try again
Err(RecvTimeoutError::Disconnected) => {
return Err(ShellError::Interrupted { span: head })
}
}
}
recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
}
.map(|value| value.into_pipeline_data());
}
fn examples(&self) -> Vec<Example> {
@ -117,3 +100,51 @@ in no particular order, regardless of the specified timeout parameter.
}]
}
}
fn recv_without_time_limit(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,
signals: &Signals,
span: Span,
) -> Result<Value, ShellError> {
loop {
if signals.interrupted() {
return Err(ShellError::Interrupted { span });
}
match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
Ok(value) => return Ok(value),
Err(RecvTimeoutError::Timeout) => {} // try again
Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
}
}
}
fn recv_with_time_limit(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,
signals: &Signals,
span: Span,
timeout: Duration,
) -> Result<Value, ShellError> {
let deadline = Instant::now() + timeout;
loop {
if signals.interrupted() {
return Err(ShellError::Interrupted { span });
}
let time_until_deadline = deadline.saturating_duration_since(Instant::now());
let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
match mailbox.recv_timeout(tag, time_to_sleep) {
Ok(value) => return Ok(value),
Err(RecvTimeoutError::Timeout) => {} // try again
Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
}
if time_until_deadline.is_zero() {
return Err(ShellError::RecvTimeout { span });
}
}
}