refactor all write_alls to ensure flushing (#5567)

This commit is contained in:
Darren Schroeder 2022-05-17 13:28:18 -05:00 committed by GitHub
parent f26d3bf8d7
commit f0cb2f38df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 85 additions and 85 deletions

3
Cargo.lock generated
View File

@ -2434,6 +2434,7 @@ dependencies = [
"nu-table", "nu-table",
"nu-term-grid", "nu-term-grid",
"nu-test-support", "nu-test-support",
"nu-utils",
"openssl", "openssl",
"pretty_assertions", "pretty_assertions",
"pretty_env_logger", "pretty_env_logger",
@ -2582,6 +2583,7 @@ dependencies = [
"nu-glob", "nu-glob",
"nu-path", "nu-path",
"nu-protocol", "nu-protocol",
"nu-utils",
"sysinfo", "sysinfo",
] ]
@ -2659,6 +2661,7 @@ dependencies = [
"indexmap", "indexmap",
"miette 4.5.0", "miette 4.5.0",
"nu-json", "nu-json",
"nu-utils",
"num-format", "num-format",
"regex", "regex",
"serde", "serde",

View File

@ -51,6 +51,7 @@ nu-protocol = { path = "./crates/nu-protocol", version = "0.62.1" }
nu-system = { path = "./crates/nu-system", version = "0.62.1" } nu-system = { path = "./crates/nu-system", version = "0.62.1" }
nu-table = { path = "./crates/nu-table", version = "0.62.1" } nu-table = { path = "./crates/nu-table", version = "0.62.1" }
nu-term-grid = { path = "./crates/nu-term-grid", version = "0.62.1" } nu-term-grid = { path = "./crates/nu-term-grid", version = "0.62.1" }
nu-utils = { path = "./crates/nu-utils", version = "0.62.1" }
openssl = { version = "0.10.38", features = ["vendored"], optional = true } openssl = { version = "0.10.38", features = ["vendored"], optional = true }
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
rayon = "1.5.1" rayon = "1.5.1"

View File

@ -9,7 +9,7 @@ use nu_protocol::{
engine::{EngineState, Stack, StateWorkingSet}, engine::{EngineState, Stack, StateWorkingSet},
Config, PipelineData, Span, Value, Config, PipelineData, Span, Value,
}; };
use std::io::Write; use nu_utils::stdout_write_all_and_flush;
/// Main function used when a file path is found as argument for nu /// Main function used when a file path is found as argument for nu
pub fn evaluate_file( pub fn evaluate_file(
@ -86,8 +86,6 @@ pub fn print_table_or_error(
match table { match table {
Ok(table) => { Ok(table) => {
for item in table { for item in table {
let stdout = std::io::stdout();
if let Value::Error { error } = item { if let Value::Error { error } = item {
let working_set = StateWorkingSet::new(engine_state); let working_set = StateWorkingSet::new(engine_state);
@ -99,10 +97,7 @@ pub fn print_table_or_error(
let mut out = item.into_string("\n", config); let mut out = item.into_string("\n", config);
out.push('\n'); out.push('\n');
match stdout.lock().write_all(out.as_bytes()) { let _ = stdout_write_all_and_flush(out).map_err(|err| eprintln!("{}", err));
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
} }
} }
Err(error) => { Err(error) => {
@ -116,8 +111,6 @@ pub fn print_table_or_error(
} }
None => { None => {
for item in pipeline_data { for item in pipeline_data {
let stdout = std::io::stdout();
if let Value::Error { error } = item { if let Value::Error { error } = item {
let working_set = StateWorkingSet::new(engine_state); let working_set = StateWorkingSet::new(engine_state);
@ -129,10 +122,7 @@ pub fn print_table_or_error(
let mut out = item.into_string("\n", config); let mut out = item.into_string("\n", config);
out.push('\n'); out.push('\n');
match stdout.lock().write_all(out.as_bytes()) { let _ = stdout_write_all_and_flush(out).map_err(|err| eprintln!("{}", err));
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
} }
} }
}; };

View File

@ -5,7 +5,6 @@ use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Value, Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Value,
}; };
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::path::Path; use std::path::Path;
#[derive(Clone)] #[derive(Clone)]
@ -100,6 +99,8 @@ impl Command for Save {
Value::String { val, .. } => { Value::String { val, .. } => {
if let Err(err) = file.write_all(val.as_bytes()) { if let Err(err) = file.write_all(val.as_bytes()) {
return Err(ShellError::IOError(err.to_string())); return Err(ShellError::IOError(err.to_string()));
} else {
file.flush()?
} }
Ok(PipelineData::new(span)) Ok(PipelineData::new(span))
@ -107,6 +108,8 @@ impl Command for Save {
Value::Binary { val, .. } => { Value::Binary { val, .. } => {
if let Err(err) = file.write_all(&val) { if let Err(err) = file.write_all(&val) {
return Err(ShellError::IOError(err.to_string())); return Err(ShellError::IOError(err.to_string()));
} else {
file.flush()?
} }
Ok(PipelineData::new(span)) Ok(PipelineData::new(span))
@ -121,6 +124,8 @@ impl Command for Save {
if let Err(err) = file.write_all(val.as_bytes()) { if let Err(err) = file.write_all(val.as_bytes()) {
return Err(ShellError::IOError(err.to_string())); return Err(ShellError::IOError(err.to_string()));
} else {
file.flush()?
} }
Ok(PipelineData::new(span)) Ok(PipelineData::new(span))
@ -166,6 +171,8 @@ impl Command for Save {
Value::String { val, .. } => { Value::String { val, .. } => {
if let Err(err) = file.write_all(val.as_bytes()) { if let Err(err) = file.write_all(val.as_bytes()) {
return Err(ShellError::IOError(err.to_string())); return Err(ShellError::IOError(err.to_string()));
} else {
file.flush()?
} }
Ok(PipelineData::new(span)) Ok(PipelineData::new(span))
@ -173,6 +180,8 @@ impl Command for Save {
Value::Binary { val, .. } => { Value::Binary { val, .. } => {
if let Err(err) = file.write_all(&val) { if let Err(err) = file.write_all(&val) {
return Err(ShellError::IOError(err.to_string())); return Err(ShellError::IOError(err.to_string()));
} else {
file.flush()?
} }
Ok(PipelineData::new(span)) Ok(PipelineData::new(span))
@ -187,6 +196,8 @@ impl Command for Save {
if let Err(err) = file.write_all(val.as_bytes()) { if let Err(err) = file.write_all(val.as_bytes()) {
return Err(ShellError::IOError(err.to_string())); return Err(ShellError::IOError(err.to_string()));
} else {
file.flush()?
} }
Ok(PipelineData::new(span)) Ok(PipelineData::new(span))

View File

@ -68,7 +68,8 @@ fn save_append_will_not_overwrite_content() {
let mut file = let mut file =
std::fs::File::create(&expected_file).expect("Failed to create test file"); std::fs::File::create(&expected_file).expect("Failed to create test file");
file.write_all("hello ".as_bytes()) file.write_all("hello ".as_bytes())
.expect("Failed to write to test file") .expect("Failed to write to test file");
file.flush().expect("Failed to flush io")
} }
nu!( nu!(

View File

@ -10,6 +10,7 @@ version = "0.62.1"
nu-protocol = { path = "../nu-protocol", features = ["plugin"], version = "0.62.1" } nu-protocol = { path = "../nu-protocol", features = ["plugin"], version = "0.62.1" }
nu-path = { path = "../nu-path", version = "0.62.1" } nu-path = { path = "../nu-path", version = "0.62.1" }
nu-glob = { path = "../nu-glob", version = "0.62.1" } nu-glob = { path = "../nu-glob", version = "0.62.1" }
nu-utils = { path = "../nu-utils", version = "0.62.1" }
chrono = { version="0.4.19", features=["serde"] } chrono = { version="0.4.19", features=["serde"] }
sysinfo = "0.23.10" sysinfo = "0.23.10"

View File

@ -1,14 +1,14 @@
use crate::{current_dir_str, get_full_help}; use crate::{current_dir_str, get_full_help};
use nu_path::expand_path_with; use nu_path::expand_path_with;
use nu_protocol::ast::{Block, Call, Expr, Expression, Operator};
use nu_protocol::engine::{EngineState, Stack, Visibility};
use nu_protocol::{ use nu_protocol::{
ast::{Block, Call, Expr, Expression, Operator},
engine::{EngineState, Stack, Visibility},
IntoInterruptiblePipelineData, IntoPipelineData, PipelineData, Range, ShellError, Span, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData, Range, ShellError, Span,
Spanned, SyntaxShape, Unit, Value, VarId, ENV_VARIABLE_ID, Spanned, SyntaxShape, Unit, Value, VarId, ENV_VARIABLE_ID,
}; };
use nu_utils::stdout_write_all_and_flush;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write;
use sysinfo::SystemExt; use sysinfo::SystemExt;
pub fn eval_operator(op: &Expression) -> Result<Operator, ShellError> { pub fn eval_operator(op: &Expression) -> Result<Operator, ShellError> {
@ -668,8 +668,6 @@ pub fn eval_block(
)?; )?;
for item in table { for item in table {
let stdout = std::io::stdout();
if let Value::Error { error } = item { if let Value::Error { error } = item {
return Err(error); return Err(error);
} }
@ -677,16 +675,11 @@ pub fn eval_block(
let mut out = item.into_string("\n", config); let mut out = item.into_string("\n", config);
out.push('\n'); out.push('\n');
match stdout.lock().write_all(out.as_bytes()) { stdout_write_all_and_flush(out)?
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
} }
} }
None => { None => {
for item in input { for item in input {
let stdout = std::io::stdout();
if let Value::Error { error } = item { if let Value::Error { error } = item {
return Err(error); return Err(error);
} }
@ -694,10 +687,7 @@ pub fn eval_block(
let mut out = item.into_string("\n", config); let mut out = item.into_string("\n", config);
out.push('\n'); out.push('\n');
match stdout.lock().write_all(out.as_bytes()) { stdout_write_all_and_flush(out)?
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
} }
} }
}; };
@ -724,8 +714,6 @@ pub fn eval_block(
)?; )?;
for item in table { for item in table {
let stdout = std::io::stdout();
if let Value::Error { error } = item { if let Value::Error { error } = item {
return Err(error); return Err(error);
} }
@ -733,16 +721,11 @@ pub fn eval_block(
let mut out = item.into_string("\n", config); let mut out = item.into_string("\n", config);
out.push('\n'); out.push('\n');
match stdout.lock().write_all(out.as_bytes()) { stdout_write_all_and_flush(out)?
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
} }
} }
None => { None => {
for item in input { for item in input {
let stdout = std::io::stdout();
if let Value::Error { error } = item { if let Value::Error { error } = item {
return Err(error); return Err(error);
} }
@ -750,10 +733,7 @@ pub fn eval_block(
let mut out = item.into_string("\n", config); let mut out = item.into_string("\n", config);
out.push('\n'); out.push('\n');
match stdout.lock().write_all(out.as_bytes()) { stdout_write_all_and_flush(out)?
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
} }
} }
}; };

View File

@ -9,6 +9,7 @@ version = "0.62.1"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
nu-utils = { path = "../nu-utils", version = "0.62.1" }
thiserror = "1.0.29" thiserror = "1.0.29"
miette = { version = "4.5.0", features = ["fancy"] } miette = { version = "4.5.0", features = ["fancy"] }
serde = {version = "1.0.130", features = ["derive"]} serde = {version = "1.0.130", features = ["derive"]}

View File

@ -1,23 +1,20 @@
use super::{Command, EnvVars, Stack}; use super::{Command, EnvVars, Stack};
use crate::Value;
use crate::{ use crate::{
ast::Block, AliasId, BlockId, Config, DeclId, Example, Module, ModuleId, OverlayId, ShellError, ast::Block, AliasId, BlockId, Config, DeclId, Example, Module, ModuleId, OverlayId, ShellError,
Signature, Span, Type, VarId, Variable, Signature, Span, Type, VarId, Variable,
}; };
use core::panic; use core::panic;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::path::Path;
#[cfg(feature = "plugin")]
use std::path::PathBuf;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{atomic::AtomicBool, Arc}, sync::{atomic::AtomicBool, Arc},
}; };
use crate::Value;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::path::Path;
#[cfg(feature = "plugin")]
use std::path::PathBuf;
static PWD_ENV: &str = "PWD"; static PWD_ENV: &str = "PWD";
pub static DEFAULT_OVERLAY_NAME: &str = "zero"; pub static DEFAULT_OVERLAY_NAME: &str = "zero";
@ -651,6 +648,17 @@ impl EngineState {
.write_all(line.as_bytes()) .write_all(line.as_bytes())
.map_err(|err| ShellError::PluginFailedToLoad(err.to_string())) .map_err(|err| ShellError::PluginFailedToLoad(err.to_string()))
}) })
.and_then(|_| {
plugin_file.flush().map_err(|err| {
ShellError::GenericError(
"Error flushing plugin file".to_string(),
format! {"{}", err},
None,
None,
Vec::new(),
)
})
})
}) })
}) })
} }

View File

@ -1,13 +1,10 @@
use std::{
io::Write,
sync::{atomic::AtomicBool, Arc},
};
use crate::{ use crate::{
ast::{Call, PathMember}, ast::{Call, PathMember},
engine::{EngineState, Stack, StateWorkingSet}, engine::{EngineState, Stack, StateWorkingSet},
format_error, Config, ListStream, RawStream, ShellError, Span, Value, format_error, Config, ListStream, RawStream, ShellError, Span, Value,
}; };
use nu_utils::{stdout_write_all_and_flush, stdout_write_all_as_binary_and_flush};
use std::sync::{atomic::AtomicBool, Arc};
/// The foundational abstraction for input and output to commands /// The foundational abstraction for input and output to commands
/// ///
@ -426,7 +423,7 @@ impl PipelineData {
// to create the table value that will be printed in the terminal // to create the table value that will be printed in the terminal
let config = engine_state.get_config(); let config = engine_state.get_config();
let stdout = std::io::stdout(); // let stdout = std::io::stdout();
if let PipelineData::ExternalStream { if let PipelineData::ExternalStream {
stdout: stream, stdout: stream,
@ -436,8 +433,9 @@ impl PipelineData {
{ {
if let Some(stream) = stream { if let Some(stream) = stream {
for s in stream { for s in stream {
let _ = stdout.lock().write_all(s?.as_binary()?); let s_live = s?;
let _ = stdout.lock().flush()?; let bin_output = s_live.as_binary()?;
stdout_write_all_as_binary_and_flush(bin_output)?
} }
} }
@ -459,8 +457,6 @@ impl PipelineData {
)?; )?;
for item in table { for item in table {
let stdout = std::io::stdout();
let mut out = if let Value::Error { error } = item { let mut out = if let Value::Error { error } = item {
let working_set = StateWorkingSet::new(engine_state); let working_set = StateWorkingSet::new(engine_state);
@ -475,17 +471,11 @@ impl PipelineData {
out.push('\n'); out.push('\n');
} }
match stdout.lock().write_all(out.as_bytes()) { stdout_write_all_and_flush(out)?
Ok(_) => {
let _ = stdout.lock().flush()?;
}
Err(err) => eprintln!("{}", err),
};
} }
} }
None => { None => {
for item in self { for item in self {
let stdout = std::io::stdout();
let mut out = if let Value::Error { error } = item { let mut out = if let Value::Error { error } = item {
let working_set = StateWorkingSet::new(engine_state); let working_set = StateWorkingSet::new(engine_state);
@ -500,12 +490,7 @@ impl PipelineData {
out.push('\n'); out.push('\n');
} }
match stdout.lock().write_all(out.as_bytes()) { stdout_write_all_and_flush(out)?
Ok(_) => {
let _ = stdout.lock().flush()?;
}
Err(err) => eprintln!("{}", err),
};
} }
} }
}; };

View File

@ -166,6 +166,8 @@ macro_rules! nu_with_plugins {
.write_all(commands.as_bytes()) .write_all(commands.as_bytes())
.expect("couldn't write to stdin"); .expect("couldn't write to stdin");
stdin.flush()?
let output = process let output = process
.wait_with_output() .wait_with_output()
.expect("couldn't read from stdout/stderr"); .expect("couldn't read from stdout/stderr");

View File

@ -1,3 +1,5 @@
pub mod utils; pub mod utils;
pub use utils::enable_vt_processing; pub use utils::{
enable_vt_processing, stdout_write_all_and_flush, stdout_write_all_as_binary_and_flush,
};

View File

@ -1,4 +1,4 @@
use std::io::Result; use std::io::{Result, Write};
pub fn enable_vt_processing() -> Result<()> { pub fn enable_vt_processing() -> Result<()> {
#[cfg(windows)] #[cfg(windows)]
@ -23,3 +23,23 @@ pub fn enable_vt_processing() -> Result<()> {
} }
Ok(()) Ok(())
} }
pub fn stdout_write_all_and_flush(output: String) -> Result<()> {
let stdout = std::io::stdout();
let ret = match stdout.lock().write_all(output.as_bytes()) {
Ok(_) => Ok(stdout.lock().flush()?),
Err(err) => Err(err),
};
ret
}
pub fn stdout_write_all_as_binary_and_flush(output: &[u8]) -> Result<()> {
let stdout = std::io::stdout();
let ret = match stdout.lock().write_all(output) {
Ok(_) => Ok(stdout.lock().flush()?),
Err(err) => Err(err),
};
ret
}

View File

@ -24,9 +24,10 @@ use nu_protocol::{
Category, Example, IntoPipelineData, PipelineData, RawStream, ShellError, Signature, Span, Category, Example, IntoPipelineData, PipelineData, RawStream, ShellError, Signature, Span,
Spanned, SyntaxShape, Value, Spanned, SyntaxShape, Value,
}; };
use nu_utils::stdout_write_all_and_flush;
use std::cell::RefCell; use std::cell::RefCell;
use std::{ use std::{
io::{BufReader, Write}, io::BufReader,
path::Path, path::Path,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -362,11 +363,7 @@ fn parse_commandline_args(
let full_help = let full_help =
get_full_help(&Nu.signature(), &Nu.examples(), engine_state, &mut stack); get_full_help(&Nu.signature(), &Nu.examples(), engine_state, &mut stack);
let _ = std::panic::catch_unwind(move || { let _ = std::panic::catch_unwind(move || stdout_write_all_and_flush(full_help));
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
let _ = stdout.write_all(full_help.as_bytes());
});
std::process::exit(1); std::process::exit(1);
} }
@ -374,9 +371,7 @@ fn parse_commandline_args(
if call.has_flag("version") { if call.has_flag("version") {
let version = env!("CARGO_PKG_VERSION").to_string(); let version = env!("CARGO_PKG_VERSION").to_string();
let _ = std::panic::catch_unwind(move || { let _ = std::panic::catch_unwind(move || {
let stdout = std::io::stdout(); stdout_write_all_and_flush(format!("{}\n", version))
let mut stdout = stdout.lock();
let _ = stdout.write_all(format!("{}\n", version).as_bytes());
}); });
std::process::exit(0); std::process::exit(0);