# Description

This is an attempt to improve the nushell situation with regard to issue
#247.

This PR implements:
- [X] spawning jobs: `job spawn { do_background_thing }`
Jobs will be implemented as threads and not forks, to maintain a
consistent behavior between unix and windows.

- [X] listing running jobs: `job list`
This should allow users to list what background tasks they currently
have running.

- [X] killing jobs: `job kill <id>`
- [X] interupting nushell code in the job's background thread
- [X] interrupting the job's currently-running process, if any.

Things that should be taken into consideration for implementation:
- [X] (unix-only) Handling `TSTP` signals while executing code and
turning the current program into a background job, and unfreezing them
in foreground `job unfreeze`.

- [X] Ensuring processes spawned by background jobs get distinct process
groups from the nushell shell itself

This PR originally aimed to implement some of the following, but it is
probably ideal to be left for another PR (scope creep)
- Disowning external process jobs (`job dispatch`)
- Inter job communication (`job send/recv`)

Roadblocks encountered so far:
- Nushell does some weird terminal sequence magics which make so that
when a background process or thread prints something to stderr and the
prompt is idle, the stderr output ends up showing up weirdly
This commit is contained in:
Renan Ribeiro
2025-02-25 14:09:52 -03:00
committed by GitHub
parent 9521b209d1
commit 9bb7f0c7dc
26 changed files with 1391 additions and 163 deletions

View File

@ -448,8 +448,17 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
// Experimental
bind_command! {
IsAdmin,
JobSpawn,
JobList,
JobKill,
Job,
};
#[cfg(unix)]
bind_command! {
JobUnfreeze,
}
// Removed
bind_command! {
LetEnv,

View File

@ -106,6 +106,7 @@ pub(super) fn start_editor(
let child = ForegroundChild::spawn(
command,
engine_state.is_interactive,
engine_state.is_background_job(),
&engine_state.pipeline_externals_state,
);
@ -119,7 +120,7 @@ pub(super) fn start_editor(
})?;
// Wrap the output into a `PipelineData::ByteStream`.
let child = nu_protocol::process::ChildProcess::new(child, None, false, call.head)?;
let child = nu_protocol::process::ChildProcess::new(child, None, false, call.head, None)?;
Ok(PipelineData::ByteStream(
ByteStream::child(child, call.head),
None,

View File

@ -0,0 +1,34 @@
use nu_engine::{command_prelude::*, get_full_help};
#[derive(Clone)]
pub struct Job;
impl Command for Job {
fn name(&self) -> &str {
"job"
}
fn signature(&self) -> Signature {
Signature::build("job")
.category(Category::Strings)
.input_output_types(vec![(Type::Nothing, Type::String)])
}
fn description(&self) -> &str {
"Various commands for working with background jobs."
}
fn extra_description(&self) -> &str {
"You must use one of the following subcommands. Using this command as-is will only produce this help message."
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
Ok(Value::string(get_full_help(self, engine_state, stack), call.head).into_pipeline_data())
}
}

View File

@ -0,0 +1,72 @@
use nu_engine::command_prelude::*;
use nu_protocol::JobId;
#[derive(Clone)]
pub struct JobKill;
impl Command for JobKill {
fn name(&self) -> &str {
"job kill"
}
fn description(&self) -> &str {
"Kill a background job."
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job kill")
.category(Category::Experimental)
.required("id", SyntaxShape::Int, "The id of the job to kill.")
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec!["halt", "stop", "end", "close"]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let id_arg: Spanned<i64> = call.req(engine_state, stack, 0)?;
if id_arg.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
}
let id: JobId = JobId::new(id_arg.item as usize);
let mut jobs = engine_state.jobs.lock().expect("jobs lock is poisoned!");
if jobs.lookup(id).is_none() {
return Err(ShellError::JobNotFound {
id: id.get(),
span: head,
});
}
jobs.kill_and_remove(id).map_err(|err| {
ShellError::Io(IoError::new_internal(
err.kind(),
"Failed to kill the requested job",
nu_protocol::location!(),
))
})?;
Ok(Value::nothing(head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "let id = job spawn { sleep 10sec }; job kill $id",
description: "Kill a newly spawned job",
result: None,
}]
}
}

View File

@ -0,0 +1,75 @@
use nu_engine::command_prelude::*;
use nu_protocol::engine::{FrozenJob, Job};
#[derive(Clone)]
pub struct JobList;
impl Command for JobList {
fn name(&self) -> &str {
"job list"
}
fn description(&self) -> &str {
"List background jobs."
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job list")
.category(Category::Experimental)
.input_output_types(vec![(Type::Nothing, Type::table())])
}
fn search_terms(&self) -> Vec<&str> {
vec!["background", "jobs"]
}
fn run(
&self,
engine_state: &EngineState,
_stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let jobs = engine_state.jobs.lock().expect("jobs lock is poisoned!");
let values = jobs
.iter()
.map(|(id, job)| {
let record = record! {
"id" => Value::int(id.get() as i64, head),
"type" => match job {
Job::Thread(_) => Value::string("thread", head),
Job::Frozen(_) => Value::string("frozen", head),
},
"pids" => match job {
Job::Thread(job) => Value::list(
job.collect_pids()
.into_iter()
.map(|it| Value::int(it as i64, head))
.collect::<Vec<Value>>(),
head,
),
Job::Frozen(FrozenJob { unfreeze }) => {
Value::list(vec![ Value::int(unfreeze.pid() as i64, head) ], head)
}
}
};
Value::record(record, head)
})
.collect::<Vec<Value>>();
Ok(Value::list(values, head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "job list",
description: "List all background jobs",
result: None,
}]
}
}

View File

@ -0,0 +1,126 @@
use std::{
sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
},
thread,
};
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::{
engine::{Closure, Job, ThreadJob},
report_shell_error, Signals,
};
#[derive(Clone)]
pub struct JobSpawn;
impl Command for JobSpawn {
fn name(&self) -> &str {
"job spawn"
}
fn description(&self) -> &str {
"Spawn a background job and retrieve its ID."
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job spawn")
.category(Category::Experimental)
.input_output_types(vec![(Type::Nothing, Type::Int)])
.required(
"closure",
SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
"The closure to run in another thread.",
)
}
fn search_terms(&self) -> Vec<&str> {
vec!["background", "bg", "&"]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let mut job_state = engine_state.clone();
job_state.is_interactive = false;
let job_stack = stack.clone();
// the new job should have its ctrl-c independent of foreground
let job_signals = Signals::new(Arc::new(AtomicBool::new(false)));
job_state.set_signals(job_signals.clone());
// the new job has a separate process group state for its processes
job_state.pipeline_externals_state = Arc::new((AtomicU32::new(0), AtomicU32::new(0)));
job_state.exit_warning_given = Arc::new(AtomicBool::new(false));
let jobs = job_state.jobs.clone();
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
let id = {
let thread_job = ThreadJob::new(job_signals);
job_state.current_thread_job = Some(thread_job.clone());
jobs.add_job(Job::Thread(thread_job))
};
let result = thread::Builder::new()
.name(format!("background job {}", id.get()))
.spawn(move || {
ClosureEvalOnce::new(&job_state, &job_stack, closure)
.run_with_input(Value::nothing(head).into_pipeline_data())
.and_then(|data| data.into_value(head))
.unwrap_or_else(|err| {
if !job_state.signals().interrupted() {
report_shell_error(&job_state, &err);
}
Value::nothing(head)
});
{
let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!");
jobs.remove_job(id);
}
});
match result {
Ok(_) => Ok(Value::int(id.get() as i64, head).into_pipeline_data()),
Err(err) => {
jobs.remove_job(id);
Err(ShellError::Io(IoError::new_with_additional_context(
err.kind(),
call.head,
None,
"Failed to spawn thread for job",
)))
}
}
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "job spawn { sleep 5sec; rm evidence.pdf }",
description: "Spawn a background job to do some time consuming work",
result: None,
}]
}
fn extra_description(&self) -> &str {
r#"Executes the provided closure in a background thread
and registers this task in the background job table, which can be retrieved with `job list`.
This command returns the job id of the newly created job.
"#
}
}

View File

@ -0,0 +1,160 @@
use nu_engine::command_prelude::*;
use nu_protocol::{
engine::{FrozenJob, Job, ThreadJob},
process::check_ok,
shell_error, JobId,
};
use nu_system::{kill_by_pid, ForegroundWaitStatus};
#[derive(Clone)]
pub struct JobUnfreeze;
impl Command for JobUnfreeze {
fn name(&self) -> &str {
"job unfreeze"
}
fn description(&self) -> &str {
"Unfreeze a frozen process job in foreground."
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job unfreeze")
.category(Category::Experimental)
.optional("id", SyntaxShape::Int, "The process id to unfreeze.")
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec!["fg"]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let option_id: Option<Spanned<i64>> = call.opt(engine_state, stack, 0)?;
let mut jobs = engine_state.jobs.lock().expect("jobs lock is poisoned!");
if let Some(id_arg) = option_id {
if id_arg.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
}
}
let id = option_id
.map(|it| JobId::new(it.item as usize))
.or_else(|| jobs.most_recent_frozen_job_id())
.ok_or_else(|| ShellError::NoFrozenJob { span: head })?;
let job = match jobs.lookup(id) {
None => {
return Err(ShellError::JobNotFound {
id: id.get(),
span: head,
})
}
Some(Job::Thread(ThreadJob { .. })) => {
return Err(ShellError::JobNotFrozen {
id: id.get(),
span: head,
})
}
Some(Job::Frozen(FrozenJob { .. })) => jobs
.remove_job(id)
.expect("job was supposed to be in job list"),
};
drop(jobs);
unfreeze_job(engine_state, id, job, head)?;
Ok(Value::nothing(head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
example: "job unfreeze",
description: "Unfreeze the latest frozen job",
result: None,
},
Example {
example: "job unfreeze 4",
description: "Unfreeze a specific frozen job by its PID",
result: None,
},
]
}
fn extra_description(&self) -> &str {
r#"When a running process is frozen (with the SIGTSTP signal or with the Ctrl-Z key on unix),
a background job gets registered for this process, which can then be resumed using this command."#
}
}
fn unfreeze_job(
state: &EngineState,
old_id: JobId,
job: Job,
span: Span,
) -> Result<(), ShellError> {
match job {
Job::Thread(ThreadJob { .. }) => Err(ShellError::JobNotFrozen {
id: old_id.get(),
span,
}),
Job::Frozen(FrozenJob { unfreeze: handle }) => {
let pid = handle.pid();
if let Some(thread_job) = &state.current_thread_job {
if !thread_job.try_add_pid(pid) {
kill_by_pid(pid.into()).map_err(|err| {
ShellError::Io(IoError::new_internal(
err.kind(),
"job was interrupted; could not kill foreground process",
nu_protocol::location!(),
))
})?;
}
}
let result = handle.unfreeze(
state
.is_interactive
.then(|| state.pipeline_externals_state.clone()),
);
if let Some(thread_job) = &state.current_thread_job {
thread_job.remove_pid(pid);
}
match result {
Ok(ForegroundWaitStatus::Frozen(handle)) => {
let mut jobs = state.jobs.lock().expect("jobs lock is poisoned!");
jobs.add_job_with_id(old_id, Job::Frozen(FrozenJob { unfreeze: handle }))
.expect("job was supposed to be removed");
Ok(())
}
Ok(ForegroundWaitStatus::Finished(status)) => check_ok(status, false, span),
Err(err) => Err(ShellError::Io(IoError::new_internal(
shell_error::io::ErrorKind::Std(err.kind()),
"Failed to unfreeze foreground process",
nu_protocol::location!(),
))),
}
}
}
}

View File

@ -1,3 +1,18 @@
mod is_admin;
mod job;
mod job_kill;
mod job_list;
mod job_spawn;
#[cfg(unix)]
mod job_unfreeze;
pub use is_admin::IsAdmin;
pub use job::Job;
pub use job_kill::JobKill;
pub use job_list::JobList;
pub use job_spawn::JobSpawn;
#[cfg(unix)]
pub use job_unfreeze::JobUnfreeze;

View File

@ -1,5 +1,6 @@
use nu_engine::command_prelude::*;
use std::process::{Command as CommandSys, Stdio};
use nu_system::build_kill_command;
use std::process::Stdio;
#[derive(Clone)]
pub struct Kill;
@ -56,70 +57,36 @@ impl Command for Kill {
let signal: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "signal")?;
let quiet: bool = call.has_flag(engine_state, stack, "quiet")?;
let mut cmd = if cfg!(windows) {
let mut cmd = CommandSys::new("taskkill");
if force {
cmd.arg("/F");
}
cmd.arg("/PID");
cmd.arg(pid.to_string());
// each pid must written as `/PID 0` otherwise
// taskkill will act as `killall` unix command
for id in &rest {
cmd.arg("/PID");
cmd.arg(id.to_string());
}
cmd
} else {
let mut cmd = CommandSys::new("kill");
if force {
if let Some(Spanned {
if cfg!(unix) {
if let (
true,
Some(Spanned {
item: _,
span: signal_span,
}) = signal
{
return Err(ShellError::IncompatibleParameters {
left_message: "force".to_string(),
left_span: call.get_flag_span(stack, "force").ok_or_else(|| {
ShellError::GenericError {
error: "Flag error".into(),
msg: "flag force not found".into(),
span: Some(call.head),
help: None,
inner: vec![],
}
})?,
right_message: "signal".to_string(),
right_span: Span::merge(
call.get_flag_span(stack, "signal").ok_or_else(|| {
ShellError::GenericError {
error: "Flag error".into(),
msg: "flag signal not found".into(),
span: Some(call.head),
help: None,
inner: vec![],
}
})?,
signal_span,
),
});
}
cmd.arg("-9");
} else if let Some(signal_value) = signal {
cmd.arg(format!("-{}", signal_value.item));
}),
) = (force, signal)
{
return Err(ShellError::IncompatibleParameters {
left_message: "force".to_string(),
left_span: call
.get_flag_span(stack, "force")
.expect("Had flag force, but didn't have span for flag"),
right_message: "signal".to_string(),
right_span: Span::merge(
call.get_flag_span(stack, "signal")
.expect("Had flag signal, but didn't have span for flag"),
signal_span,
),
});
}
cmd.arg(pid.to_string());
cmd.args(rest.iter().map(move |id| id.to_string()));
cmd
};
let mut cmd = build_kill_command(
force,
std::iter::once(pid).chain(rest),
signal.map(|spanned| spanned.item as u32),
);
// pipe everything to null
if quiet {
cmd.stdin(Stdio::null())

View File

@ -1,4 +1,4 @@
use nu_engine::command_prelude::*;
use nu_engine::{command_prelude::*, exit::cleanup_exit};
#[derive(Clone)]
pub struct Exit;
@ -36,11 +36,11 @@ impl Command for Exit {
) -> Result<PipelineData, ShellError> {
let exit_code: Option<i64> = call.opt(engine_state, stack, 0)?;
if let Some(exit_code) = exit_code {
std::process::exit(exit_code as i32);
}
let exit_code = exit_code.map_or(0, |it| it as i32);
std::process::exit(0);
cleanup_exit((), engine_state, exit_code);
Ok(Value::nothing(call.head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {

View File

@ -2,10 +2,13 @@ use nu_cmd_base::hook::eval_hook;
use nu_engine::{command_prelude::*, env_to_strings};
use nu_path::{dots::expand_ndots_safe, expand_tilde, AbsolutePath};
use nu_protocol::{
did_you_mean, process::ChildProcess, shell_error::io::IoError, ByteStream, NuGlob, OutDest,
Signals, UseAnsiColoring,
did_you_mean,
engine::{FrozenJob, Job},
process::{ChildProcess, PostWaitCallback},
shell_error::io::IoError,
ByteStream, NuGlob, OutDest, Signals, UseAnsiColoring,
};
use nu_system::ForegroundChild;
use nu_system::{kill_by_pid, ForegroundChild, ForegroundWaitStatus};
use nu_utils::IgnoreCaseExt;
use pathdiff::diff_paths;
#[cfg(windows)]
@ -204,12 +207,28 @@ impl Command for External {
command.stderr(writer);
Some(reader)
} else {
command.stdout(
Stdio::try_from(stdout).map_err(|err| IoError::new(err.kind(), call.head, None))?,
);
command.stderr(
Stdio::try_from(stderr).map_err(|err| IoError::new(err.kind(), call.head, None))?,
);
if engine_state.is_background_job()
&& matches!(stdout, OutDest::Inherit | OutDest::Print)
{
command.stdout(Stdio::null());
} else {
command.stdout(
Stdio::try_from(stdout)
.map_err(|err| IoError::new(err.kind(), call.head, None))?,
);
}
if engine_state.is_background_job()
&& matches!(stderr, OutDest::Inherit | OutDest::Print)
{
command.stderr(Stdio::null());
} else {
command.stderr(
Stdio::try_from(stderr)
.map_err(|err| IoError::new(err.kind(), call.head, None))?,
);
}
None
};
@ -248,6 +267,7 @@ impl Command for External {
let child = ForegroundChild::spawn(
command,
engine_state.is_interactive,
engine_state.is_background_job(),
&engine_state.pipeline_externals_state,
);
@ -259,6 +279,18 @@ impl Command for External {
)
})?;
if let Some(thread_job) = &engine_state.current_thread_job {
if !thread_job.try_add_pid(child.pid()) {
kill_by_pid(child.pid().into()).map_err(|err| {
ShellError::Io(IoError::new_internal(
err.kind(),
"Could not spawn external stdin worker",
nu_protocol::location!(),
))
})?;
}
}
// If we need to copy data into the child process, do it now.
if let Some(data) = data_to_copy_into_stdin {
let stdin = child.as_mut().stdin.take().expect("stdin is piped");
@ -279,12 +311,28 @@ impl Command for External {
})?;
}
let jobs = engine_state.jobs.clone();
let this_job = engine_state.current_thread_job.clone();
let child_pid = child.pid();
// Wrap the output into a `PipelineData::ByteStream`.
let mut child = ChildProcess::new(
child,
merged_stream,
matches!(stderr, OutDest::Pipe),
call.head,
// handle wait statuses for job control
Some(PostWaitCallback(Box::new(move |status| {
if let Some(this_job) = this_job {
this_job.remove_pid(child_pid);
}
if let ForegroundWaitStatus::Frozen(unfreeze) = status {
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
jobs.add_job(Job::Frozen(FrozenJob { unfreeze }));
}
}))),
)?;
if matches!(stdout, OutDest::Pipe | OutDest::PipeSeparate)