diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/mod.rs b/crates/nu_plugin_polars/src/dataframe/command/core/mod.rs index f71a3dfdbd..fecbba2d69 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/mod.rs @@ -3,6 +3,7 @@ mod columns; mod fetch; mod open; mod profile; +mod resource; mod save; mod schema; mod shape; @@ -12,11 +13,10 @@ mod to_lazy; mod to_nu; mod to_repr; -use crate::PolarsPlugin; -use nu_plugin::PluginCommand; - pub use self::open::OpenDataFrame; +use crate::PolarsPlugin; use fetch::LazyFetch; +use nu_plugin::PluginCommand; pub use schema::SchemaCmd; pub use shape::ShapeDF; pub use summary::Summary; diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/open.rs b/crates/nu_plugin_polars/src/dataframe/command/core/open.rs index b218530e64..f9620290db 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/open.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/open.rs @@ -1,13 +1,11 @@ use crate::{ - cloud::build_cloud_options, + command::core::resource::Resource, dataframe::values::NuSchema, values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType}, EngineWrapper, PolarsPlugin, }; use log::debug; -use nu_path::expand_path_with; use nu_utils::perf; -use url::Url; use nu_plugin::PluginCommand; use nu_protocol::{ @@ -15,7 +13,7 @@ use nu_protocol::{ Span, Spanned, SyntaxShape, Type, Value, }; -use std::{fmt::Debug, fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc}; +use std::{fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc}; use polars::{ lazy::frame::LazyJsonLineReader, @@ -25,7 +23,7 @@ use polars::{ }, }; -use polars_io::{avro::AvroReader, cloud::CloudOptions, csv::read::CsvReadOptions, HiveOptions}; +use polars_io::{avro::AvroReader, csv::read::CsvReadOptions, HiveOptions}; const DEFAULT_INFER_SCHEMA: usize = 100; @@ -116,61 +114,6 @@ impl PluginCommand for OpenDataFrame { } } -struct Resource { - path: String, - extension: Option, - cloud_options: Option, - span: Span, -} - -impl Debug for Resource { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // We can't print out the cloud options as it might have - // secrets in it.. So just print whether or not it was defined - f.debug_struct("Resource") - .field("path", &self.path) - .field("extension", &self.extension) - .field("has_cloud_options", &self.cloud_options.is_some()) - .field("span", &self.span) - .finish() - } -} - -impl Resource { - fn new( - plugin: &PolarsPlugin, - engine: &nu_plugin::EngineInterface, - spanned_path: &Spanned, - ) -> Result { - let mut path = spanned_path.item.clone(); - let (path_buf, cloud_options) = if let Ok(url) = path.parse::() { - let cloud_options = - build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError { - error: format!("Could not determine a supported cloud type from url: {url}"), - msg: "".into(), - span: None, - help: None, - inner: vec![], - })?; - let p: PathBuf = url.path().into(); - (p, Some(cloud_options)) - } else { - let new_path = expand_path_with(path, engine.get_current_dir()?, true); - path = new_path.to_string_lossy().to_string(); - (new_path, None) - }; - let extension = path_buf - .extension() - .and_then(|s| s.to_str().map(|s| s.to_string())); - Ok(Self { - path, - extension, - cloud_options, - span: spanned_path.span, - }) - } -} - fn command( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, @@ -242,7 +185,7 @@ fn from_parquet( let file_span = resource.span; if !is_eager { let args = ScanArgsParquet { - cloud_options: resource.cloud_options.clone(), + cloud_options: resource.cloud_options, ..Default::default() }; let df: NuLazyFrame = LazyFrame::scan_parquet(file_path, args) @@ -345,7 +288,7 @@ fn from_arrow( cache: true, rechunk: false, row_index: None, - cloud_options: resource.cloud_options.clone(), + cloud_options: resource.cloud_options, include_file_paths: None, hive_options: HiveOptions::default(), }; @@ -544,8 +487,7 @@ fn from_csv( let truncate_ragged_lines: bool = call.has_flag("truncate-ragged-lines")?; if !is_eager { - let csv_reader = - LazyCsvReader::new(file_path).with_cloud_options(resource.cloud_options.clone()); + let csv_reader = LazyCsvReader::new(file_path).with_cloud_options(resource.cloud_options); let csv_reader = match delimiter { None => csv_reader, diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/resource.rs b/crates/nu_plugin_polars/src/dataframe/command/core/resource.rs new file mode 100644 index 0000000000..e5cbaf7cb4 --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/command/core/resource.rs @@ -0,0 +1,85 @@ +use std::path::{Component, Path, PathBuf}; + +use crate::{cloud::build_cloud_options, PolarsPlugin}; +use nu_path::expand_path_with; +use nu_protocol::{ShellError, Span, Spanned}; +use polars_io::cloud::CloudOptions; +use url::Url; + +pub(crate) struct Resource { + pub(crate) path: String, + pub(crate) extension: Option, + pub(crate) cloud_options: Option, + pub(crate) span: Span, +} + +impl std::fmt::Debug for Resource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // We can't print out the cloud options as it might have + // secrets in it.. So just print whether or not it was defined + f.debug_struct("Resource") + .field("path", &self.path) + .field("extension", &self.extension) + .field("has_cloud_options", &self.cloud_options.is_some()) + .field("span", &self.span) + .finish() + } +} + +impl Resource { + pub(crate) fn new( + plugin: &PolarsPlugin, + engine: &nu_plugin::EngineInterface, + spanned_path: &Spanned, + ) -> Result { + let mut path = spanned_path.item.clone(); + + let (path_buf, cloud_options) = match path.parse::() { + Ok(url) if !is_windows_path(&path) => { + let cloud_options = + build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError { + error: format!( + "Could not determine a supported cloud type from url: {url}" + ), + msg: "".into(), + span: None, + help: None, + inner: vec![], + })?; + let p: PathBuf = url.path().into(); + (p, Some(cloud_options)) + } + _ => { + let new_path = expand_path_with(path, engine.get_current_dir()?, true); + path = new_path.to_string_lossy().to_string(); + (new_path, None) + } + }; + + let extension = path_buf + .extension() + .and_then(|s| s.to_str().map(|s| s.to_string())); + Ok(Self { + path, + extension, + cloud_options, + span: spanned_path.span, + }) + } +} + +// This is needed because Url parses windows paths as +// valid URLs. +fn is_windows_path(path: &str) -> bool { + // Only window spath will + if path.contains('\\') { + return true; + } + + let path = Path::new(path); + match path.components().next() { + // This will only occur if their is a drive prefix + Some(Component::Prefix(_)) => true, + _ => false, + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/save/arrow.rs b/crates/nu_plugin_polars/src/dataframe/command/core/save/arrow.rs index 2a9d1dd947..35b21968d1 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/save/arrow.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/save/arrow.rs @@ -1,31 +1,36 @@ -use std::{fs::File, path::Path}; +use std::fs::File; use nu_plugin::EvaluatedCall; -use nu_protocol::{ShellError, Span}; +use nu_protocol::ShellError; use polars::prelude::{IpcWriter, SerWriter}; use polars_io::ipc::IpcWriterOptions; -use crate::values::{NuDataFrame, NuLazyFrame}; +use crate::{ + command::core::resource::Resource, + values::{NuDataFrame, NuLazyFrame}, +}; use super::polars_file_save_error; pub(crate) fn command_lazy( _call: &EvaluatedCall, lazy: &NuLazyFrame, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; lazy.to_polars() - // todo - add cloud options - .sink_ipc(file_path, IpcWriterOptions::default(), None) + .sink_ipc( + file_path, + IpcWriterOptions::default(), + resource.cloud_options, + ) .map_err(|e| polars_file_save_error(e, file_span)) } -pub(crate) fn command_eager( - df: &NuDataFrame, - file_path: &Path, - file_span: Span, -) -> Result<(), ShellError> { +pub(crate) fn command_eager(df: &NuDataFrame, resource: Resource) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; let mut file = File::create(file_path).map_err(|e| ShellError::GenericError { error: format!("Error with file name: {e}"), msg: "".into(), diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/save/avro.rs b/crates/nu_plugin_polars/src/dataframe/command/core/save/avro.rs index ef5c0cdca2..d3012aa685 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/save/avro.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/save/avro.rs @@ -1,11 +1,11 @@ use std::fs::File; -use std::path::Path; use nu_plugin::EvaluatedCall; -use nu_protocol::{ShellError, Span}; +use nu_protocol::ShellError; use polars_io::avro::{AvroCompression, AvroWriter}; use polars_io::SerWriter; +use crate::command::core::resource::Resource; use crate::values::NuDataFrame; fn get_compression(call: &EvaluatedCall) -> Result, ShellError> { @@ -31,9 +31,10 @@ fn get_compression(call: &EvaluatedCall) -> Result, Shel pub(crate) fn command_eager( call: &EvaluatedCall, df: &NuDataFrame, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; let compression = get_compression(call)?; let file = File::create(file_path).map_err(|e| ShellError::GenericError { diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/save/csv.rs b/crates/nu_plugin_polars/src/dataframe/command/core/save/csv.rs index 0a4b79f0b3..c2c6a37ce0 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/save/csv.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/save/csv.rs @@ -1,20 +1,24 @@ -use std::{fs::File, path::Path}; +use std::fs::File; use nu_plugin::EvaluatedCall; -use nu_protocol::{ShellError, Span, Spanned}; +use nu_protocol::{ShellError, Spanned}; use polars::prelude::{CsvWriter, SerWriter}; use polars_io::csv::write::{CsvWriterOptions, SerializeOptions}; -use crate::values::{NuDataFrame, NuLazyFrame}; +use crate::{ + command::core::resource::Resource, + values::{NuDataFrame, NuLazyFrame}, +}; use super::polars_file_save_error; pub(crate) fn command_lazy( call: &EvaluatedCall, lazy: &NuLazyFrame, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; let delimiter: Option> = call.get_flag("csv-delimiter")?; let separator = delimiter .and_then(|d| d.item.chars().next().map(|c| c as u8)) @@ -32,17 +36,17 @@ pub(crate) fn command_lazy( }; lazy.to_polars() - // todo - add cloud options - .sink_csv(file_path, options, None) + .sink_csv(file_path, options, resource.cloud_options) .map_err(|e| polars_file_save_error(e, file_span)) } pub(crate) fn command_eager( call: &EvaluatedCall, df: &NuDataFrame, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; let delimiter: Option> = call.get_flag("csv-delimiter")?; let no_header: bool = call.has_flag("csv-no-header")?; diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/save/mod.rs b/crates/nu_plugin_polars/src/dataframe/command/core/save/mod.rs index 775baca990..f5fad3b495 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/save/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/save/mod.rs @@ -7,11 +7,12 @@ mod parquet; use std::path::PathBuf; use crate::{ + command::core::resource::Resource, values::{cant_convert_err, PolarsFileType, PolarsPluginObject, PolarsPluginType}, PolarsPlugin, }; -use nu_path::expand_path_with; +use log::debug; use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; use nu_protocol::{ shell_error::io::IoError, Category, Example, LabeledError, PipelineData, ShellError, Signature, @@ -35,7 +36,7 @@ impl PluginCommand for SaveDF { fn signature(&self) -> Signature { Signature::build(self.name()) - .required("path", SyntaxShape::Filepath, "Path to write to.") + .required("path", SyntaxShape::String, "Path or cloud url to write to") .named( "type", SyntaxShape::String, @@ -127,70 +128,77 @@ impl PluginCommand for SaveDF { } fn command( - _plugin: &PolarsPlugin, + plugin: &PolarsPlugin, engine: &EngineInterface, call: &EvaluatedCall, polars_object: PolarsPluginObject, ) -> Result { - let spanned_file: Spanned = call.req(0)?; - let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true); - let file_span = spanned_file.span; + let spanned_file: Spanned = call.req(0)?; + debug!("file: {}", spanned_file.item); + + let resource = Resource::new(plugin, engine, &spanned_file)?; let type_option: Option<(String, Span)> = call .get_flag("type")? .map(|t: Spanned| (t.item, t.span)) - .or_else(|| { - file_path - .extension() - .map(|e| (e.to_string_lossy().into_owned(), spanned_file.span)) - }); + .or_else(|| resource.extension.clone().map(|e| (e, resource.span))); + debug!("resource: {resource:?}"); match type_option { Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) { PolarsFileType::Parquet => match polars_object { PolarsPluginObject::NuLazyFrame(ref lazy) => { - parquet::command_lazy(call, lazy, &file_path, file_span) + parquet::command_lazy(call, lazy, resource) } - PolarsPluginObject::NuDataFrame(ref df) => { - parquet::command_eager(df, &file_path, file_span) + PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => { + parquet::command_lazy(call, &df.lazy(), resource) } - _ => Err(unknown_file_save_error(file_span)), + PolarsPluginObject::NuDataFrame(ref df) => parquet::command_eager(df, resource), + _ => Err(unknown_file_save_error(resource.span)), }, PolarsFileType::Arrow => match polars_object { PolarsPluginObject::NuLazyFrame(ref lazy) => { - arrow::command_lazy(call, lazy, &file_path, file_span) + arrow::command_lazy(call, lazy, resource) } - PolarsPluginObject::NuDataFrame(ref df) => { - arrow::command_eager(df, &file_path, file_span) + PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => { + arrow::command_lazy(call, &df.lazy(), resource) } - _ => Err(unknown_file_save_error(file_span)), + PolarsPluginObject::NuDataFrame(ref df) => arrow::command_eager(df, resource), + _ => Err(unknown_file_save_error(resource.span)), }, PolarsFileType::NdJson => match polars_object { PolarsPluginObject::NuLazyFrame(ref lazy) => { - ndjson::command_lazy(call, lazy, &file_path, file_span) + ndjson::command_lazy(call, lazy, resource) } - PolarsPluginObject::NuDataFrame(ref df) => { - ndjson::command_eager(df, &file_path, file_span) + PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => { + ndjson::command_lazy(call, &df.lazy(), resource) } - _ => Err(unknown_file_save_error(file_span)), + PolarsPluginObject::NuDataFrame(ref df) => ndjson::command_eager(df, resource), + _ => Err(unknown_file_save_error(resource.span)), }, PolarsFileType::Avro => match polars_object { + _ if resource.cloud_options.is_some() => Err(ShellError::GenericError { + error: "Cloud URLS are not supported with Avro".into(), + msg: "".into(), + span: call.get_flag_span("eager"), + help: Some("Remove flag".into()), + inner: vec![], + }), PolarsPluginObject::NuLazyFrame(lazy) => { let df = lazy.collect(call.head)?; - avro::command_eager(call, &df, &file_path, file_span) + avro::command_eager(call, &df, resource) } - PolarsPluginObject::NuDataFrame(ref df) => { - avro::command_eager(call, df, &file_path, file_span) - } - _ => Err(unknown_file_save_error(file_span)), + PolarsPluginObject::NuDataFrame(ref df) => avro::command_eager(call, df, resource), + _ => Err(unknown_file_save_error(resource.span)), }, PolarsFileType::Csv => match polars_object { PolarsPluginObject::NuLazyFrame(ref lazy) => { - csv::command_lazy(call, lazy, &file_path, file_span) + csv::command_lazy(call, lazy, resource) } - PolarsPluginObject::NuDataFrame(ref df) => { - csv::command_eager(call, df, &file_path, file_span) + PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => { + csv::command_lazy(call, &df.lazy(), resource) } - _ => Err(unknown_file_save_error(file_span)), + PolarsPluginObject::NuDataFrame(ref df) => csv::command_eager(call, df, resource), + _ => Err(unknown_file_save_error(resource.span)), }, _ => Err(PolarsFileType::build_unsupported_error( &ext, @@ -206,8 +214,8 @@ fn command( }, None => Err(ShellError::Io(IoError::new_with_additional_context( std::io::ErrorKind::NotFound, - spanned_file.span, - spanned_file.item, + resource.span, + Some(PathBuf::from(resource.path)), "File without extension", ))), }?; diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/save/ndjson.rs b/crates/nu_plugin_polars/src/dataframe/command/core/save/ndjson.rs index 37ffb4b4e4..c5732843bb 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/save/ndjson.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/save/ndjson.rs @@ -1,31 +1,36 @@ -use std::{fs::File, io::BufWriter, path::Path}; +use std::{fs::File, io::BufWriter}; use nu_plugin::EvaluatedCall; -use nu_protocol::{ShellError, Span}; +use nu_protocol::ShellError; use polars::prelude::{JsonWriter, SerWriter}; use polars_io::json::JsonWriterOptions; -use crate::values::{NuDataFrame, NuLazyFrame}; +use crate::{ + command::core::resource::Resource, + values::{NuDataFrame, NuLazyFrame}, +}; use super::polars_file_save_error; pub(crate) fn command_lazy( _call: &EvaluatedCall, lazy: &NuLazyFrame, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; lazy.to_polars() - // todo - add cloud options - .sink_json(file_path, JsonWriterOptions::default(), None) + .sink_json( + file_path, + JsonWriterOptions::default(), + resource.cloud_options, + ) .map_err(|e| polars_file_save_error(e, file_span)) } -pub(crate) fn command_eager( - df: &NuDataFrame, - file_path: &Path, - file_span: Span, -) -> Result<(), ShellError> { +pub(crate) fn command_eager(df: &NuDataFrame, resource: Resource) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; let file = File::create(file_path).map_err(|e| ShellError::GenericError { error: format!("Error with file name: {e}"), msg: "".into(), diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/save/parquet.rs b/crates/nu_plugin_polars/src/dataframe/command/core/save/parquet.rs index 7a9766b3be..6dce612c7c 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/save/parquet.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/save/parquet.rs @@ -1,31 +1,38 @@ -use std::{fs::File, path::Path}; +use std::fs::File; +use log::debug; use nu_plugin::EvaluatedCall; -use nu_protocol::{ShellError, Span}; +use nu_protocol::ShellError; use polars::prelude::ParquetWriter; use polars_io::parquet::write::ParquetWriteOptions; -use crate::values::{NuDataFrame, NuLazyFrame}; +use crate::{ + command::core::resource::Resource, + values::{NuDataFrame, NuLazyFrame}, +}; use super::polars_file_save_error; pub(crate) fn command_lazy( _call: &EvaluatedCall, lazy: &NuLazyFrame, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; + debug!("Writing parquet file {file_path}"); lazy.to_polars() - // todo - add cloud options - .sink_parquet(&file_path, ParquetWriteOptions::default(), None) + .sink_parquet( + &file_path, + ParquetWriteOptions::default(), + resource.cloud_options, + ) .map_err(|e| polars_file_save_error(e, file_span)) } -pub(crate) fn command_eager( - df: &NuDataFrame, - file_path: &Path, - file_span: Span, -) -> Result<(), ShellError> { +pub(crate) fn command_eager(df: &NuDataFrame, resource: Resource) -> Result<(), ShellError> { + let file_path = resource.path; + let file_span = resource.span; let file = File::create(file_path).map_err(|e| ShellError::GenericError { error: "Error with file name".into(), msg: e.to_string(),