Table paging (Draft PR) (#3058)

* This adds table paging, relying on minus to perform the paging functionality
This is gated behind the table-pager feature

* fix problem with long running InputStreams blocking table() returning

* some comments regarding Arc clones, and callback from minus
This commit is contained in:
rezural 2021-03-01 12:59:33 +11:00 committed by GitHub
parent 6b2327f231
commit 079e575cac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 39 deletions

View File

@ -147,6 +147,8 @@ start = ["nu_plugin_start"]
trash-support = ["nu-cli/trash-support", "nu-command/trash-support"] trash-support = ["nu-cli/trash-support", "nu-command/trash-support"]
tree = ["nu_plugin_tree"] tree = ["nu_plugin_tree"]
xpath = ["nu_plugin_xpath"] xpath = ["nu_plugin_xpath"]
#This is disabled in extra for now
table-pager = ["nu-command/table-pager"]
[profile.release] [profile.release]
#strip = "symbols" #Couldn't get working +nightly #strip = "symbols" #Couldn't get working +nightly

View File

@ -62,6 +62,7 @@ itertools = "0.10.0"
lazy_static = "1.*" lazy_static = "1.*"
log = "0.4.14" log = "0.4.14"
meval = "0.2.0" meval = "0.2.0"
minus = { optional = true, features = ["async_std_lib", "search"], git = "https://github.com/rezural/minus", branch = "add-finished-callback" }
num-bigint = { version = "0.3.1", features = ["serde"] } num-bigint = { version = "0.3.1", features = ["serde"] }
num-format = { version = "0.4.0", features = ["with-num-bigint"] } num-format = { version = "0.4.0", features = ["with-num-bigint"] }
num-traits = "0.2.14" num-traits = "0.2.14"
@ -130,3 +131,5 @@ stable = []
trash-support = ["trash"] trash-support = ["trash"]
directories = ["directories-next"] directories = ["directories-next"]
dirs = ["dirs-next"] dirs = ["dirs-next"]
table-pager = ["minus"]

View File

@ -8,6 +8,12 @@ use nu_protocol::{Primitive, Signature, SyntaxShape, UntaggedValue, Value};
use nu_table::{draw_table, Alignment, StyledString, TextStyle}; use nu_table::{draw_table, Alignment, StyledString, TextStyle};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Instant; use std::time::Instant;
#[cfg(feature = "table-pager")]
use {
futures::future::join,
minus::{ExitStrategy, Pager},
std::fmt::Write,
};
const STREAM_PAGE_SIZE: usize = 1000; const STREAM_PAGE_SIZE: usize = 1000;
const STREAM_TIMEOUT_CHECK_INTERVAL: usize = 100; const STREAM_TIMEOUT_CHECK_INTERVAL: usize = 100;
@ -156,7 +162,7 @@ async fn table(
args: CommandArgs, args: CommandArgs,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let mut args = args.evaluate_once().await?; let mut args = args.evaluate_once().await?;
let mut finished = false;
// Ideally, get_color_config would get all the colors configured in the config.toml // Ideally, get_color_config would get all the colors configured in the config.toml
// and create a style based on those settings. However, there are few places where // and create a style based on those settings. However, there are few places where
// this just won't work right now, like header styling, because a style needs to know // this just won't work right now, like header styling, because a style needs to know
@ -187,64 +193,123 @@ async fn table(
let term_width = args.host.lock().width(); let term_width = args.host.lock().width();
while !finished { #[cfg(feature = "table-pager")]
let mut new_input: VecDeque<Value> = VecDeque::new(); let pager = Pager::new()
.set_exit_strategy(ExitStrategy::PagerQuit)
.set_searchable(true)
.set_page_if_havent_overflowed(false)
.finish();
let start_time = Instant::now(); let stream_data = async {
for idx in 0..STREAM_PAGE_SIZE { let finished = Arc::new(AtomicBool::new(false));
if let Some(val) = delay_slot { // we are required to clone finished, for use within the callback, otherwise we get borrow errors
new_input.push_back(val); #[cfg(feature = "table-pager")]
delay_slot = None; let finished_within_callback = finished.clone();
} else { #[cfg(feature = "table-pager")]
match args.input.next().await { {
Some(a) => { // This is called when the pager finishes, to indicate to the
if !new_input.is_empty() { // while loop below to finish, in case of long running InputStream consumer
if let Some(descs) = new_input.get(0) { // that doesnt finish by the time the user quits out of the pager
let descs = descs.data_descriptors(); pager
let compare = a.data_descriptors(); .lock()
if descs != compare { .await
delay_slot = Some(a); .on_finished_callbacks
break; .push(Box::new(move || {
finished_within_callback.store(true, Ordering::Relaxed);
}));
}
while !finished.clone().load(Ordering::Relaxed) {
let mut new_input: VecDeque<Value> = VecDeque::new();
let start_time = Instant::now();
for idx in 0..STREAM_PAGE_SIZE {
if let Some(val) = delay_slot {
new_input.push_back(val);
delay_slot = None;
} else {
match args.input.next().await {
Some(a) => {
if !new_input.is_empty() {
if let Some(descs) = new_input.get(0) {
let descs = descs.data_descriptors();
let compare = a.data_descriptors();
if descs != compare {
delay_slot = Some(a);
break;
} else {
new_input.push_back(a);
}
} else { } else {
new_input.push_back(a); new_input.push_back(a);
} }
} else { } else {
new_input.push_back(a); new_input.push_back(a);
} }
} else { }
new_input.push_back(a); _ => {
finished.store(true, Ordering::Relaxed);
break;
} }
} }
_ => {
finished = true;
break;
}
}
// Check if we've gone over our buffering threshold // Check if we've gone over our buffering threshold
if (idx + 1) % STREAM_TIMEOUT_CHECK_INTERVAL == 0 { if (idx + 1) % STREAM_TIMEOUT_CHECK_INTERVAL == 0 {
let end_time = Instant::now(); let end_time = Instant::now();
// If we've been buffering over a second, go ahead and send out what we have so far // If we've been buffering over a second, go ahead and send out what we have so far
if (end_time - start_time).as_secs() >= 1 { if (end_time - start_time).as_secs() >= 1 {
break; break;
}
if finished.load(Ordering::Relaxed) {
break;
}
} }
} }
} }
let input: Vec<Value> = new_input.into();
if !input.is_empty() {
let t = from_list(&input, &configuration, start_number, &color_hm);
let output = draw_table(&t, term_width, &color_hm);
#[cfg(feature = "table-pager")]
{
let mut pager = pager.lock().await;
writeln!(pager.lines, "{}", output).map_err(|_| {
ShellError::untagged_runtime_error("Error writing to pager")
})?;
}
#[cfg(not(feature = "table-pager"))]
println!("{}", output);
}
start_number += input.len();
} }
let input: Vec<Value> = new_input.into(); #[cfg(feature = "table-pager")]
{
if !input.is_empty() { let mut pager_lock = pager.lock().await;
let t = from_list(&input, &configuration, start_number, &color_hm); pager_lock.data_finished();
let output = draw_table(&t, term_width, &color_hm);
println!("{}", output);
} }
start_number += input.len(); Result::<_, ShellError>::Ok(())
};
#[cfg(feature = "table-pager")]
{
let (minus_result, streaming_result) =
join(minus::async_std_updating(pager.clone()), stream_data).await;
minus_result.map_err(|_| ShellError::untagged_runtime_error("Error paging data"))?;
streaming_result?;
} }
#[cfg(not(feature = "table-pager"))]
stream_data
.await
.map_err(|_| ShellError::untagged_runtime_error("Error streaming data"))?;
Ok(OutputStream::empty()) Ok(OutputStream::empty())
} }