Added the ability to turn on performance debugging through and env var for the polars plugin (#13191)

This allows performance debugging to be turned on by setting:

```nushell
$env.POLARS_PLUGIN_PERF = "true"
```

Furthermore, this improves the other plugin debugging by allowing the
env variable for debugging to be set at any time versus having to be
available when nushell is launched:

```nushell
$env.POLARS_PLUGIN_DEBUG = "true"
```

This plugin introduces a `perf` function that will output timing
results. This works very similar to the perf function available in
nu_utils::utils::perf. This version prints everything to std error to
not break the plugin stream and uses the engine interface to see if the
env variable is configured.

This pull requests uses this `perf` function when:
* opening csv files as dataframes
* opening json lines files as dataframes

This will hopefully help provide some more fine grained information on
how long it takes polars to open different dataframes. The `perf` can
also be utilized later for other dataframes use cases.
This commit is contained in:
Jack Wright 2024-06-20 16:37:38 -07:00 committed by GitHub
parent 7d2d573eb8
commit 20834c9d47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 29 deletions

View File

@ -13,7 +13,7 @@ use nu_plugin::{EngineInterface, PluginCommand};
use nu_protocol::{LabeledError, ShellError, Span};
use uuid::Uuid;
use crate::{plugin_debug, values::PolarsPluginObject, PolarsPlugin};
use crate::{plugin_debug, values::PolarsPluginObject, EngineWrapper, PolarsPlugin};
#[derive(Debug, Clone)]
pub struct CacheValue {
@ -47,7 +47,7 @@ impl Cache {
/// * `force` - Delete even if there are multiple references
pub fn remove(
&self,
maybe_engine: Option<&EngineInterface>,
engine: impl EngineWrapper,
key: &Uuid,
force: bool,
) -> Result<Option<CacheValue>, ShellError> {
@ -60,22 +60,23 @@ impl Cache {
let removed = if force || reference_count.unwrap_or_default() < 1 {
let removed = lock.remove(key);
plugin_debug!("PolarsPlugin: removing {key} from cache: {removed:?}");
plugin_debug!(
engine,
"PolarsPlugin: removing {key} from cache: {removed:?}"
);
removed
} else {
plugin_debug!("PolarsPlugin: decrementing reference count for {key}");
plugin_debug!(
engine,
"PolarsPlugin: decrementing reference count for {key}"
);
None
};
// Once there are no more entries in the cache
// we can turn plugin gc back on
match maybe_engine {
Some(engine) if lock.is_empty() => {
plugin_debug!("PolarsPlugin: Cache is empty enabling GC");
engine.set_gc_disabled(false).map_err(LabeledError::from)?;
}
_ => (),
};
plugin_debug!(engine, "PolarsPlugin: Cache is empty enabling GC");
engine.set_gc_disabled(false).map_err(LabeledError::from)?;
drop(lock);
Ok(removed)
}
@ -84,23 +85,21 @@ impl Cache {
/// The maybe_engine parameter is required outside of testing
pub fn insert(
&self,
maybe_engine: Option<&EngineInterface>,
engine: impl EngineWrapper,
uuid: Uuid,
value: PolarsPluginObject,
span: Span,
) -> Result<Option<CacheValue>, ShellError> {
let mut lock = self.lock()?;
plugin_debug!("PolarsPlugin: Inserting {uuid} into cache: {value:?}");
plugin_debug!(
engine,
"PolarsPlugin: Inserting {uuid} into cache: {value:?}"
);
// turn off plugin gc the first time an entry is added to the cache
// as we don't want the plugin to be garbage collected if there
// is any live data
match maybe_engine {
Some(engine) if lock.is_empty() => {
plugin_debug!("PolarsPlugin: Cache has values disabling GC");
engine.set_gc_disabled(true).map_err(LabeledError::from)?;
}
_ => (),
};
plugin_debug!(engine, "PolarsPlugin: Cache has values disabling GC");
engine.set_gc_disabled(true).map_err(LabeledError::from)?;
let cache_value = CacheValue {
uuid,
value,
@ -154,7 +153,7 @@ pub trait Cacheable: Sized + Clone {
span: Span,
) -> Result<Self, ShellError> {
plugin.cache.insert(
Some(engine),
engine,
self.cache_id().to_owned(),
self.to_cache_value()?,
span,

View File

@ -63,7 +63,7 @@ fn remove_cache_entry(
let key = as_uuid(key, span)?;
let msg = plugin
.cache
.remove(Some(engine), &key, true)?
.remove(engine, &key, true)?
.map(|_| format!("Removed: {key}"))
.unwrap_or_else(|| format!("No value found for key: {key}"));
Ok(Value::string(msg, span))

View File

@ -1,5 +1,6 @@
use crate::{
dataframe::values::NuSchema,
perf,
values::{CustomValueSupport, NuLazyFrame},
PolarsPlugin,
};
@ -378,7 +379,10 @@ fn from_jsonl(
.get_flag("schema")?
.map(|schema| NuSchema::try_from(&schema))
.transpose()?;
if call.has_flag("lazy")? {
let start_time = std::time::Instant::now();
let df = LazyJsonLineReader::new(file_path)
.with_infer_schema_length(infer_schema)
.with_schema(maybe_schema.map(|s| s.into()))
@ -390,6 +394,16 @@ fn from_jsonl(
help: None,
inner: vec![],
})?;
perf(
engine,
"Lazy json lines dataframe open",
start_time,
file!(),
line!(),
column!(),
);
let df = NuLazyFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head)
} else {
@ -410,6 +424,8 @@ fn from_jsonl(
None => reader,
};
let start_time = std::time::Instant::now();
let df: NuDataFrame = reader
.finish()
.map_err(|e| ShellError::GenericError {
@ -421,6 +437,15 @@ fn from_jsonl(
})?
.into();
perf(
engine,
"Eager json lines dataframe open",
start_time,
file!(),
line!(),
column!(),
);
df.cache_and_to_value(plugin, engine, call.head)
}
}
@ -484,6 +509,7 @@ fn from_csv(
Some(r) => csv_reader.with_skip_rows(r),
};
let start_time = std::time::Instant::now();
let df: NuLazyFrame = csv_reader
.finish()
.map_err(|e| ShellError::GenericError {
@ -495,8 +521,18 @@ fn from_csv(
})?
.into();
perf(
engine,
"Lazy CSV dataframe open",
start_time,
file!(),
line!(),
column!(),
);
df.cache_and_to_value(plugin, engine, call.head)
} else {
let start_time = std::time::Instant::now();
let df = CsvReadOptions::default()
.with_has_header(!no_header)
.with_infer_schema_length(infer_schema)
@ -529,6 +565,16 @@ fn from_csv(
help: None,
inner: vec![],
})?;
perf(
engine,
"Eager CSV dataframe open",
start_time,
file!(),
line!(),
column!(),
);
let df = NuDataFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head)
}

View File

@ -8,25 +8,89 @@ use nu_plugin::{EngineInterface, Plugin, PluginCommand};
mod cache;
pub mod dataframe;
pub use dataframe::*;
use nu_protocol::{ast::Operator, CustomValue, LabeledError, Spanned, Value};
use nu_protocol::{ast::Operator, CustomValue, LabeledError, ShellError, Span, Spanned, Value};
use crate::{
eager::eager_commands, expressions::expr_commands, lazy::lazy_commands,
series::series_commands, values::PolarsPluginCustomValue,
};
pub trait EngineWrapper {
fn get_env_var(&self, key: &str) -> Option<String>;
fn use_color(&self) -> bool;
fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError>;
}
impl EngineWrapper for &EngineInterface {
fn get_env_var(&self, key: &str) -> Option<String> {
EngineInterface::get_env_var(self, key)
.ok()
.flatten()
.map(|x| match x {
Value::String { val, .. } => val,
_ => "".to_string(),
})
}
fn use_color(&self) -> bool {
self.get_config()
.ok()
.and_then(|config| config.color_config.get("use_color").cloned())
.unwrap_or(Value::bool(false, Span::unknown()))
.is_true()
}
fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
EngineInterface::set_gc_disabled(self, disabled)
}
}
#[macro_export]
macro_rules! plugin_debug {
($($arg:tt)*) => {{
if std::env::var("POLARS_PLUGIN_DEBUG")
.ok()
.filter(|x| x == "1" || x == "true")
($env_var_provider:tt, $($arg:tt)*) => {{
if $env_var_provider.get_env_var("POLARS_PLUGIN_DEBUG")
.filter(|s| s == "1" || s == "true")
.is_some() {
eprintln!($($arg)*);
}
}};
}
pub fn perf(
env: impl EngineWrapper,
msg: &str,
dur: std::time::Instant,
file: &str,
line: u32,
column: u32,
) {
if env
.get_env_var("POLARS_PLUGIN_PERF")
.filter(|s| s == "1" || s == "true")
.is_some()
{
if env.use_color() {
eprintln!(
"perf: {}:{}:{} \x1b[32m{}\x1b[0m took \x1b[33m{:?}\x1b[0m",
file,
line,
column,
msg,
dur.elapsed(),
);
} else {
eprintln!(
"perf: {}:{}:{} {} took {:?}",
file,
line,
column,
msg,
dur.elapsed(),
);
}
}
}
#[derive(Default)]
pub struct PolarsPlugin {
pub(crate) cache: Cache,
@ -52,7 +116,7 @@ impl Plugin for PolarsPlugin {
) -> Result<(), LabeledError> {
if !self.disable_cache_drop {
let id = CustomValueType::try_from_custom_value(custom_value)?.id();
let _ = self.cache.remove(Some(engine), &id, false);
let _ = self.cache.remove(engine, &id, false);
}
Ok(())
}
@ -193,6 +257,22 @@ pub mod test {
}
}
struct TestEngineWrapper;
impl EngineWrapper for TestEngineWrapper {
fn get_env_var(&self, key: &str) -> Option<String> {
std::env::var(key).ok()
}
fn use_color(&self) -> bool {
false
}
fn set_gc_disabled(&self, _disabled: bool) -> Result<(), ShellError> {
Ok(())
}
}
pub fn test_polars_plugin_command(command: &impl PluginCommand) -> Result<(), ShellError> {
test_polars_plugin_command_with_decls(command, vec![])
}
@ -212,7 +292,7 @@ pub mod test {
let id = obj.id();
plugin
.cache
.insert(None, id, obj, Span::test_data())
.insert(TestEngineWrapper {}, id, obj, Span::test_data())
.unwrap();
}
}