Make the sleep command respond to Ctrl+C (#2550)

This commit is contained in:
Radek Vít 2020-09-16 18:14:33 +02:00 committed by GitHub
parent f5fad393d0
commit d05f9b3b1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,8 +4,17 @@ use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_source::Tagged;
use parking_lot::Mutex;
use std::{
future::Future,
pin::Pin,
sync::atomic::Ordering,
task::{Poll, Waker},
thread,
time::Duration,
};
use std::{thread, time};
const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
pub struct Sleep;
@ -36,7 +45,15 @@ impl WholeStreamCommand for Sleep {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
sleep(args, registry).await
let registry = registry.clone();
let ctrl_c = args.ctrl_c().clone();
let (SleepArgs { dur, rest }, ..) = args.process(&registry).await?;
let total_dur = dur.item + rest.iter().map(|val| val.item).sum::<u64>();
let total_dur = Duration::from_nanos(total_dur);
SleepFuture::new(total_dur, ctrl_c).await
}
fn examples(&self) -> Vec<Example> {
@ -55,16 +72,88 @@ impl WholeStreamCommand for Sleep {
}
}
async fn sleep(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
struct SleepFuture {
shared_state: Arc<Mutex<SharedState>>,
}
let (SleepArgs { dur, rest }, ..) = args.process(&registry).await?;
impl SleepFuture {
/// Create a new `SleepFuture` which will complete after the provided
/// timeout and check for Ctrl+C periodically.
pub fn new(duration: Duration, ctrl_c: Arc<AtomicBool>) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
done: false,
waker: None,
}));
let total_dur = dur.item + rest.iter().map(|val| val.item).sum::<u64>();
let total_dur = time::Duration::from_nanos(total_dur);
thread::sleep(total_dur);
// Spawn the main sleep thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
if !shared_state.done {
shared_state.done = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
}
});
Ok(OutputStream::empty())
// Spawn the Ctrl+C-watching polling thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
loop {
{
let mut shared_state = thread_shared_state.lock();
// exit if the main thread is done
if shared_state.done {
return;
}
// finish the future prematurely if Ctrl+C has been pressed
if ctrl_c.load(Ordering::SeqCst) {
shared_state.done = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
return;
}
}
// sleep for a short time
thread::sleep(CTRL_C_CHECK_INTERVAL);
}
});
SleepFuture { shared_state }
}
}
struct SharedState {
done: bool,
waker: Option<Waker>,
}
impl Future for SleepFuture {
type Output = Result<OutputStream, ShellError>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
let mut shared_state = self.shared_state.lock();
if shared_state.done {
Poll::Ready(Ok(OutputStream::empty()))
} else {
// Set the waker if necessary
if shared_state
.waker
.as_ref()
.map(|waker| !waker.will_wake(&cx.waker()))
.unwrap_or(true)
{
shared_state.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}
}
#[cfg(test)]