mirror of
https://github.com/nushell/nushell.git
synced 2025-08-09 08:36:10 +02:00
Added commands for working with the plugin cache. (#12576)
# Description This pull request provides three new commands: `polars store-ls` - moved from `polars ls`. It provides the list of all object stored in the plugin cache `polars store-rm` - deletes a cached object `polars store-get` - gets an object from the cache. The addition of `polars store-get` required adding a reference_count to cached entries. `polars get` is the only command that will increment this value. `polars rm` will remove the value despite it's count. Calls to PolarsPlugin::custom_value_dropped will decrement the value. The prefix store- was chosen due to there already being a `polars cache` command. These commands were not made sub-commands as there isn't a way to display help for sub commands in plugins (e.g. `polars store` displaying help) and I felt the store- seemed fine anyways. The output of `polars store-ls` now shows the reference count for each object. # User-Facing Changes polars ls has now moved to polars store-ls --------- Co-authored-by: Jack Wright <jack.wright@disqo.com>
This commit is contained in:
96
crates/nu_plugin_polars/src/cache/get.rs
vendored
Normal file
96
crates/nu_plugin_polars/src/cache/get.rs
vendored
Normal file
@ -0,0 +1,96 @@
|
||||
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
|
||||
use nu_protocol::{
|
||||
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
|
||||
Value,
|
||||
};
|
||||
use polars::{prelude::NamedFrom, series::Series};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
values::{CustomValueSupport, NuDataFrame},
|
||||
PolarsPlugin,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CacheGet;
|
||||
|
||||
impl PluginCommand for CacheGet {
|
||||
type Plugin = PolarsPlugin;
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"polars store-get"
|
||||
}
|
||||
|
||||
fn usage(&self) -> &str {
|
||||
"Gets a Dataframe or other object from the plugin cache."
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::build(self.name())
|
||||
.required("key", SyntaxShape::String, "Key of objects to get")
|
||||
.input_output_types(vec![
|
||||
(Type::Any, Type::Custom("dataframe".into())),
|
||||
(Type::Any, Type::Custom("expression".into())),
|
||||
])
|
||||
.category(Category::Custom("dataframe".into()))
|
||||
}
|
||||
|
||||
fn examples(&self) -> Vec<Example> {
|
||||
vec![Example {
|
||||
description: "Get a stored object",
|
||||
example: r#"let df = ([[a b];[1 2] [3 4]] | polars into-df);
|
||||
polars store-ls | get key | first | polars store-get $in"#,
|
||||
result: Some(
|
||||
NuDataFrame::try_from_series_vec(
|
||||
vec![Series::new("a", &[1_i64, 3]), Series::new("b", &[2_i64, 4])],
|
||||
Span::test_data(),
|
||||
)
|
||||
.expect("could not create dataframe")
|
||||
.into_value(Span::test_data()),
|
||||
),
|
||||
}]
|
||||
}
|
||||
|
||||
fn run(
|
||||
&self,
|
||||
plugin: &Self::Plugin,
|
||||
_engine: &EngineInterface,
|
||||
call: &EvaluatedCall,
|
||||
_input: PipelineData,
|
||||
) -> Result<PipelineData, LabeledError> {
|
||||
let key = call
|
||||
.req::<String>(0)
|
||||
.and_then(|ref k| as_uuid(k, call.head))?;
|
||||
|
||||
let value = if let Some(cache_value) = plugin.cache.get(&key, true)? {
|
||||
let polars_object = cache_value.value;
|
||||
polars_object.into_value(call.head)
|
||||
} else {
|
||||
Value::nothing(call.head)
|
||||
};
|
||||
|
||||
Ok(PipelineData::Value(value, None))
|
||||
}
|
||||
}
|
||||
|
||||
fn as_uuid(s: &str, span: Span) -> Result<Uuid, ShellError> {
|
||||
Uuid::parse_str(s).map_err(|e| ShellError::GenericError {
|
||||
error: format!("Failed to convert key string to UUID: {e}"),
|
||||
msg: "".into(),
|
||||
span: Some(span),
|
||||
help: None,
|
||||
inner: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::test::test_polars_plugin_command_with_decls;
|
||||
use nu_command::{First, Get};
|
||||
|
||||
#[test]
|
||||
fn test_examples() -> Result<(), ShellError> {
|
||||
test_polars_plugin_command_with_decls(&CacheGet, vec![Box::new(Get), Box::new(First)])
|
||||
}
|
||||
}
|
128
crates/nu_plugin_polars/src/cache/list.rs
vendored
Normal file
128
crates/nu_plugin_polars/src/cache/list.rs
vendored
Normal file
@ -0,0 +1,128 @@
|
||||
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
|
||||
use nu_protocol::{
|
||||
record, Category, Example, IntoPipelineData, LabeledError, PipelineData, Signature, Value,
|
||||
};
|
||||
|
||||
use crate::{values::PolarsPluginObject, PolarsPlugin};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ListDF;
|
||||
|
||||
impl PluginCommand for ListDF {
|
||||
type Plugin = PolarsPlugin;
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"polars store-ls"
|
||||
}
|
||||
|
||||
fn usage(&self) -> &str {
|
||||
"Lists stored dataframes."
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::build(self.name()).category(Category::Custom("dataframe".into()))
|
||||
}
|
||||
|
||||
fn examples(&self) -> Vec<Example> {
|
||||
vec![Example {
|
||||
description: "Creates a new dataframe and shows it in the dataframe list",
|
||||
example: r#"let test = ([[a b];[1 2] [3 4]] | polars into-df);
|
||||
polars store-ls"#,
|
||||
result: None,
|
||||
}]
|
||||
}
|
||||
|
||||
fn run(
|
||||
&self,
|
||||
plugin: &Self::Plugin,
|
||||
engine: &EngineInterface,
|
||||
call: &EvaluatedCall,
|
||||
_input: PipelineData,
|
||||
) -> Result<PipelineData, LabeledError> {
|
||||
let vals = plugin.cache.process_entries(|(key, value)| {
|
||||
let span_contents = engine.get_span_contents(value.span)?;
|
||||
let span_contents = String::from_utf8_lossy(&span_contents);
|
||||
match &value.value {
|
||||
PolarsPluginObject::NuDataFrame(df) => Ok(Some(Value::record(
|
||||
record! {
|
||||
"key" => Value::string(key.to_string(), call.head),
|
||||
"created" => Value::date(value.created, call.head),
|
||||
"columns" => Value::int(df.as_ref().width() as i64, call.head),
|
||||
"rows" => Value::int(df.as_ref().height() as i64, call.head),
|
||||
"type" => Value::string("DataFrame", call.head),
|
||||
"estimated_size" => Value::filesize(df.to_polars().estimated_size() as i64, call.head),
|
||||
"span_contents" => Value::string(span_contents, value.span),
|
||||
"span_start" => Value::int(value.span.start as i64, call.head),
|
||||
"span_end" => Value::int(value.span.end as i64, call.head),
|
||||
"reference_count" => Value::int(value.reference_count as i64, call.head),
|
||||
},
|
||||
call.head,
|
||||
))),
|
||||
PolarsPluginObject::NuLazyFrame(lf) => {
|
||||
let lf = lf.clone().collect(call.head)?;
|
||||
Ok(Some(Value::record(
|
||||
record! {
|
||||
"key" => Value::string(key.to_string(), call.head),
|
||||
"created" => Value::date(value.created, call.head),
|
||||
"columns" => Value::int(lf.as_ref().width() as i64, call.head),
|
||||
"rows" => Value::int(lf.as_ref().height() as i64, call.head),
|
||||
"type" => Value::string("LazyFrame", call.head),
|
||||
"estimated_size" => Value::filesize(lf.to_polars().estimated_size() as i64, call.head),
|
||||
"span_contents" => Value::string(span_contents, value.span),
|
||||
"span_start" => Value::int(value.span.start as i64, call.head),
|
||||
"span_end" => Value::int(value.span.end as i64, call.head),
|
||||
"reference_count" => Value::int(value.reference_count as i64, call.head),
|
||||
},
|
||||
call.head,
|
||||
)))
|
||||
}
|
||||
PolarsPluginObject::NuExpression(_) => Ok(Some(Value::record(
|
||||
record! {
|
||||
"key" => Value::string(key.to_string(), call.head),
|
||||
"created" => Value::date(value.created, call.head),
|
||||
"columns" => Value::nothing(call.head),
|
||||
"rows" => Value::nothing(call.head),
|
||||
"type" => Value::string("Expression", call.head),
|
||||
"estimated_size" => Value::nothing(call.head),
|
||||
"span_contents" => Value::string(span_contents, value.span),
|
||||
"span_start" => Value::int(value.span.start as i64, call.head),
|
||||
"span_end" => Value::int(value.span.end as i64, call.head),
|
||||
"reference_count" => Value::int(value.reference_count as i64, call.head),
|
||||
},
|
||||
call.head,
|
||||
))),
|
||||
PolarsPluginObject::NuLazyGroupBy(_) => Ok(Some(Value::record(
|
||||
record! {
|
||||
"key" => Value::string(key.to_string(), call.head),
|
||||
"columns" => Value::nothing(call.head),
|
||||
"rows" => Value::nothing(call.head),
|
||||
"type" => Value::string("LazyGroupBy", call.head),
|
||||
"estimated_size" => Value::nothing(call.head),
|
||||
"span_contents" => Value::string(span_contents, call.head),
|
||||
"span_start" => Value::int(call.head.start as i64, call.head),
|
||||
"span_end" => Value::int(call.head.end as i64, call.head),
|
||||
"reference_count" => Value::int(value.reference_count as i64, call.head),
|
||||
},
|
||||
call.head,
|
||||
))),
|
||||
PolarsPluginObject::NuWhen(_) => Ok(Some(Value::record(
|
||||
record! {
|
||||
"key" => Value::string(key.to_string(), call.head),
|
||||
"columns" => Value::nothing(call.head),
|
||||
"rows" => Value::nothing(call.head),
|
||||
"type" => Value::string("When", call.head),
|
||||
"estimated_size" => Value::nothing(call.head),
|
||||
"span_contents" => Value::string(span_contents.to_string(), call.head),
|
||||
"span_start" => Value::int(call.head.start as i64, call.head),
|
||||
"span_end" => Value::int(call.head.end as i64, call.head),
|
||||
"reference_count" => Value::int(value.reference_count as i64, call.head),
|
||||
},
|
||||
call.head,
|
||||
))),
|
||||
}
|
||||
})?;
|
||||
let vals = vals.into_iter().flatten().collect();
|
||||
let list = Value::list(vals, call.head);
|
||||
Ok(list.into_pipeline_data())
|
||||
}
|
||||
}
|
180
crates/nu_plugin_polars/src/cache/mod.rs
vendored
Normal file
180
crates/nu_plugin_polars/src/cache/mod.rs
vendored
Normal file
@ -0,0 +1,180 @@
|
||||
mod get;
|
||||
mod list;
|
||||
mod rm;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Mutex, MutexGuard},
|
||||
};
|
||||
|
||||
use chrono::{DateTime, FixedOffset, Local};
|
||||
pub use list::ListDF;
|
||||
use nu_plugin::{EngineInterface, PluginCommand};
|
||||
use nu_protocol::{LabeledError, ShellError, Span};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{plugin_debug, values::PolarsPluginObject, PolarsPlugin};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CacheValue {
|
||||
pub uuid: Uuid,
|
||||
pub value: PolarsPluginObject,
|
||||
pub created: DateTime<FixedOffset>,
|
||||
pub span: Span,
|
||||
pub reference_count: i16,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Cache {
|
||||
cache: Mutex<HashMap<Uuid, CacheValue>>,
|
||||
}
|
||||
|
||||
impl Cache {
|
||||
fn lock(&self) -> Result<MutexGuard<HashMap<Uuid, CacheValue>>, ShellError> {
|
||||
self.cache.lock().map_err(|e| ShellError::GenericError {
|
||||
error: format!("error acquiring cache lock: {e}"),
|
||||
msg: "".into(),
|
||||
span: None,
|
||||
help: None,
|
||||
inner: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
/// Removes an item from the plugin cache.
|
||||
///
|
||||
/// * `maybe_engine` - Current EngineInterface reference. Required outside of testing
|
||||
/// * `key` - The key of the cache entry to remove.
|
||||
/// * `force` - Delete even if there are multiple references
|
||||
pub fn remove(
|
||||
&self,
|
||||
maybe_engine: Option<&EngineInterface>,
|
||||
key: &Uuid,
|
||||
force: bool,
|
||||
) -> Result<Option<CacheValue>, ShellError> {
|
||||
let mut lock = self.lock()?;
|
||||
|
||||
let reference_count = lock.get_mut(key).map(|cache_value| {
|
||||
cache_value.reference_count -= 1;
|
||||
cache_value.reference_count
|
||||
});
|
||||
|
||||
let removed = if force || reference_count.unwrap_or_default() < 1 {
|
||||
let removed = lock.remove(key);
|
||||
plugin_debug!("PolarsPlugin: removing {key} from cache: {removed:?}");
|
||||
removed
|
||||
} else {
|
||||
plugin_debug!("PolarsPlugin: decrementing reference count for {key}");
|
||||
None
|
||||
};
|
||||
|
||||
// Once there are no more entries in the cache
|
||||
// we can turn plugin gc back on
|
||||
match maybe_engine {
|
||||
Some(engine) if lock.is_empty() => {
|
||||
plugin_debug!("PolarsPlugin: Cache is empty enabling GC");
|
||||
engine.set_gc_disabled(false).map_err(LabeledError::from)?;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
drop(lock);
|
||||
Ok(removed)
|
||||
}
|
||||
|
||||
/// Inserts an item into the plugin cache.
|
||||
/// The maybe_engine parameter is required outside of testing
|
||||
pub fn insert(
|
||||
&self,
|
||||
maybe_engine: Option<&EngineInterface>,
|
||||
uuid: Uuid,
|
||||
value: PolarsPluginObject,
|
||||
span: Span,
|
||||
) -> Result<Option<CacheValue>, ShellError> {
|
||||
let mut lock = self.lock()?;
|
||||
plugin_debug!("PolarsPlugin: Inserting {uuid} into cache: {value:?}");
|
||||
// turn off plugin gc the first time an entry is added to the cache
|
||||
// as we don't want the plugin to be garbage collected if there
|
||||
// is any live data
|
||||
match maybe_engine {
|
||||
Some(engine) if lock.is_empty() => {
|
||||
plugin_debug!("PolarsPlugin: Cache has values disabling GC");
|
||||
engine.set_gc_disabled(true).map_err(LabeledError::from)?;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
let cache_value = CacheValue {
|
||||
uuid,
|
||||
value,
|
||||
created: Local::now().into(),
|
||||
span,
|
||||
reference_count: 1,
|
||||
};
|
||||
let result = lock.insert(uuid, cache_value);
|
||||
drop(lock);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn get(&self, uuid: &Uuid, increment: bool) -> Result<Option<CacheValue>, ShellError> {
|
||||
let mut lock = self.lock()?;
|
||||
let result = lock.get_mut(uuid).map(|cv| {
|
||||
if increment {
|
||||
cv.reference_count += 1;
|
||||
}
|
||||
cv.clone()
|
||||
});
|
||||
drop(lock);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn process_entries<F, T>(&self, mut func: F) -> Result<Vec<T>, ShellError>
|
||||
where
|
||||
F: FnMut((&Uuid, &CacheValue)) -> Result<T, ShellError>,
|
||||
{
|
||||
let lock = self.lock()?;
|
||||
let mut vals: Vec<T> = Vec::new();
|
||||
for entry in lock.iter() {
|
||||
let val = func(entry)?;
|
||||
vals.push(val);
|
||||
}
|
||||
drop(lock);
|
||||
Ok(vals)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Cacheable: Sized + Clone {
|
||||
fn cache_id(&self) -> &Uuid;
|
||||
|
||||
fn to_cache_value(&self) -> Result<PolarsPluginObject, ShellError>;
|
||||
|
||||
fn from_cache_value(cv: PolarsPluginObject) -> Result<Self, ShellError>;
|
||||
|
||||
fn cache(
|
||||
self,
|
||||
plugin: &PolarsPlugin,
|
||||
engine: &EngineInterface,
|
||||
span: Span,
|
||||
) -> Result<Self, ShellError> {
|
||||
plugin.cache.insert(
|
||||
Some(engine),
|
||||
self.cache_id().to_owned(),
|
||||
self.to_cache_value()?,
|
||||
span,
|
||||
)?;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn get_cached(plugin: &PolarsPlugin, id: &Uuid) -> Result<Option<Self>, ShellError> {
|
||||
if let Some(cache_value) = plugin.cache.get(id, false)? {
|
||||
Ok(Some(Self::from_cache_value(cache_value.value)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cache_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin>>> {
|
||||
vec![
|
||||
Box::new(ListDF),
|
||||
Box::new(rm::CacheRemove),
|
||||
Box::new(get::CacheGet),
|
||||
]
|
||||
}
|
106
crates/nu_plugin_polars/src/cache/rm.rs
vendored
Normal file
106
crates/nu_plugin_polars/src/cache/rm.rs
vendored
Normal file
@ -0,0 +1,106 @@
|
||||
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
|
||||
use nu_protocol::{
|
||||
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
|
||||
Value,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::PolarsPlugin;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CacheRemove;
|
||||
|
||||
impl PluginCommand for CacheRemove {
|
||||
type Plugin = PolarsPlugin;
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"polars store-rm"
|
||||
}
|
||||
|
||||
fn usage(&self) -> &str {
|
||||
"Removes a stored Dataframe or other object from the plugin cache."
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::build(self.name())
|
||||
.rest("keys", SyntaxShape::String, "Keys of objects to remove")
|
||||
.input_output_type(Type::Any, Type::List(Box::new(Type::String)))
|
||||
.category(Category::Custom("dataframe".into()))
|
||||
}
|
||||
|
||||
fn examples(&self) -> Vec<Example> {
|
||||
vec![Example {
|
||||
description: "Removes a stored ",
|
||||
example: r#"let df = ([[a b];[1 2] [3 4]] | polars into-df);
|
||||
polars store-ls | get key | first | polars store-rm $in"#,
|
||||
result: None,
|
||||
}]
|
||||
}
|
||||
|
||||
fn run(
|
||||
&self,
|
||||
plugin: &Self::Plugin,
|
||||
engine: &EngineInterface,
|
||||
call: &EvaluatedCall,
|
||||
_input: PipelineData,
|
||||
) -> Result<PipelineData, LabeledError> {
|
||||
let msgs: Vec<Value> = call
|
||||
.rest::<String>(0)?
|
||||
.into_iter()
|
||||
.map(|ref key| remove_cache_entry(plugin, engine, key, call.head))
|
||||
.collect::<Result<Vec<Value>, ShellError>>()?;
|
||||
|
||||
Ok(PipelineData::Value(Value::list(msgs, call.head), None))
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_cache_entry(
|
||||
plugin: &PolarsPlugin,
|
||||
engine: &EngineInterface,
|
||||
key: &str,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError> {
|
||||
let key = as_uuid(key, span)?;
|
||||
let msg = plugin
|
||||
.cache
|
||||
.remove(Some(engine), &key, true)?
|
||||
.map(|_| format!("Removed: {key}"))
|
||||
.unwrap_or_else(|| format!("No value found for key: {key}"));
|
||||
Ok(Value::string(msg, span))
|
||||
}
|
||||
|
||||
fn as_uuid(s: &str, span: Span) -> Result<Uuid, ShellError> {
|
||||
Uuid::parse_str(s).map_err(|e| ShellError::GenericError {
|
||||
error: format!("Failed to convert key string to UUID: {e}"),
|
||||
msg: "".into(),
|
||||
span: Some(span),
|
||||
help: None,
|
||||
inner: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use nu_command::{First, Get};
|
||||
use nu_plugin_test_support::PluginTest;
|
||||
use nu_protocol::Span;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_remove() -> Result<(), ShellError> {
|
||||
let plugin = PolarsPlugin::new_test_mode().into();
|
||||
let pipeline_data = PluginTest::new("polars", plugin)?
|
||||
.add_decl(Box::new(First))?
|
||||
.add_decl(Box::new(Get))?
|
||||
.eval("let df = ([[a b];[1 2] [3 4]] | polars into-df); polars store-ls | get key | first | polars store-rm $in")?;
|
||||
let value = pipeline_data.into_value(Span::test_data());
|
||||
let msg = value
|
||||
.as_list()?
|
||||
.first()
|
||||
.expect("there should be a first entry")
|
||||
.as_str()?;
|
||||
assert!(msg.contains("Removed"));
|
||||
Ok(())
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user