Limit mem usage + back-pressure via bounded channels (#4986)

Prior to this change, a pipeline of externals would result in high memory
usage if any of the producers in the chain, produced data faster than
the consumers.

For example a pipeline:

  > fast-producer | slow-consumer

Would cause a build up of `Value::{String,Binary}`'s in the mpsc channels
between each command as values are added to the channels faster than they
are consumed, eventually OOM'ing depnding on system resources, the volume
of data and speed diff. between fast v's slow.

This change replaces the unbounded channels with bounded channels
to limit the number of values that can build up and providing
back-pressure to limit ram usage.
This commit is contained in:
Andrew Barnes 2022-03-27 13:34:34 +11:00 committed by GitHub
parent 56a546e73d
commit 91e17d2f9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -17,6 +17,7 @@ use pathdiff::diff_paths;
use regex::Regex;
const OUTPUT_BUFFER_SIZE: usize = 1024;
const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3;
#[derive(Clone)]
pub struct External;
@ -187,8 +188,8 @@ impl ExternalCommand {
let redirect_stderr = self.redirect_stderr;
let span = self.name.span;
let output_ctrlc = ctrlc.clone();
let (stdout_tx, stdout_rx) = mpsc::channel();
let (stderr_tx, stderr_rx) = mpsc::channel();
let (stdout_tx, stdout_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
let (exit_code_tx, exit_code_rx) = mpsc::channel();
std::thread::spawn(move || {