From d8908eb20548fc4260231fcc909984b5331b745d Mon Sep 17 00:00:00 2001 From: Jack Wright Date: Wed, 18 Dec 2024 18:23:07 -0800 Subject: [PATCH] Working on S3/File resource --- Cargo.lock | 1 + crates/nu_plugin_polars/Cargo.toml | 1 + crates/nu_plugin_polars/src/cloud/mod.rs | 19 ++- .../src/dataframe/command/core/open.rs | 117 +++++++++--------- .../src/dataframe/values/file_type.rs | 1 + 5 files changed, 70 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b3e839a3b..9e0b550dfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4245,6 +4245,7 @@ dependencies = [ "tempfile", "tokio", "typetag", + "url", "uuid", ] diff --git a/crates/nu_plugin_polars/Cargo.toml b/crates/nu_plugin_polars/Cargo.toml index 538ac8c10b..442b28ae09 100644 --- a/crates/nu_plugin_polars/Cargo.toml +++ b/crates/nu_plugin_polars/Cargo.toml @@ -49,6 +49,7 @@ aws-config = { version = "1.5", features = ["sso"] } aws-credential-types = "1.2" tokio = { version = "1.41", features = ["full"] } object_store = { version = "0.10", default-features = false } +url.workspace = true [dependencies.polars] features = [ diff --git a/crates/nu_plugin_polars/src/cloud/mod.rs b/crates/nu_plugin_polars/src/cloud/mod.rs index e630fe0c2d..7b0e5fa917 100644 --- a/crates/nu_plugin_polars/src/cloud/mod.rs +++ b/crates/nu_plugin_polars/src/cloud/mod.rs @@ -1,5 +1,6 @@ use nu_protocol::ShellError; use polars_io::cloud::CloudOptions; +use url::Url; use crate::PolarsPlugin; @@ -9,24 +10,18 @@ enum CloudType { Aws, } -fn determine_cloud_type(path: &str) -> Option { - if path.starts_with("s3://") | path.starts_with("s3a://") { - Some(CloudType::Aws) - } else { - None +fn determine_cloud_type(url: &Url) -> Option { + match url.scheme() { + "s3" | "s3a" => Some(CloudType::Aws), + _ => None, } } -/// Returns true if it is a supported cloud url -pub(crate) fn is_cloud_url(path: &str) ->bool { - determine_cloud_type(path).is_some() -} - pub(crate) fn build_cloud_options( plugin: &PolarsPlugin, - path: &str, + url: &Url, ) -> Result, ShellError> { - match determine_cloud_type(path) { + match determine_cloud_type(url) { Some(CloudType::Aws) => aws::build_cloud_options(plugin).map(|c| Some(c)), _ => Ok(None), } diff --git a/crates/nu_plugin_polars/src/dataframe/command/core/open.rs b/crates/nu_plugin_polars/src/dataframe/command/core/open.rs index 027a63f084..2858071924 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/core/open.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/core/open.rs @@ -1,10 +1,8 @@ use crate::{ - dataframe::values::NuSchema, - values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType}, - EngineWrapper, PolarsPlugin, + cloud::build_cloud_options, dataframe::values::NuSchema, values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType}, EngineWrapper, PolarsPlugin }; -use nu_path::expand_path_with; use nu_utils::perf; +use url::Url; use nu_plugin::PluginCommand; use nu_protocol::{ @@ -12,13 +10,7 @@ use nu_protocol::{ SyntaxShape, Type, Value, }; -use std::{ - fs::File, - io::BufReader, - num::NonZeroUsize, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc}; use polars::{ lazy::frame::LazyJsonLineReader, @@ -119,19 +111,39 @@ impl PluginCommand for OpenDataFrame { } } -enum Resource { - File(PathBuf, Span), - CloudUrl(String, CloudOptions, Span) +struct Resource { + path: String, + extension: Option, + cloud_options: Option, + span: Span, } impl Resource { - fn file_type(&self) ->Option { - let extension = match self { - Self::File(p, _) => p.extension().map(|s| s.to_string_lossy()), - Self::CloudUrl(p, _, _) => p.ri - } + fn new(plugin: &PolarsPlugin, spanned_path: &Spanned) -> Result { + let (path_buf, cloud_options) = if let Ok(url) = spanned_path.item.parse::() { + let cloud_options = + build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError { + error: format!("Could not determine a supported cloud type from url: {url}"), + msg: "".into(), + span: None, + help: None, + inner: vec![], + })?; + let p: PathBuf = url.path().into(); + (p, Some(cloud_options)) + } else { + (PathBuf::from(&spanned_path.item), None) + }; + let extension = path_buf + .extension() + .and_then(|s| s.to_str().map(|s| s.to_string())); + Ok(Self { + path: spanned_path.item.clone(), + extension, + cloud_options, + span: spanned_path.span, + }) } - } fn command( @@ -141,32 +153,20 @@ fn command( ) -> Result { let spanned_file: Spanned = call.req(0)?; - let resource = if let Some(cloud_options) = crate::cloud::build_cloud_options(plugin, &spanned_file.item)? { - Resource::CloudUrl(spanned_file.item, cloud_options, spanned_file.span) - } else { - let path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true); - Resource::File(path, spanned_file.span) - }; - + let resource = Resource::new(plugin, &spanned_file)?; let type_option: Option<(String, Span)> = call .get_flag("type")? .map(|t: Spanned| (t.item, t.span)) - .or_else(|| { - file_path - .extension() - .map(|e| (e.to_string_lossy().into_owned(), spanned_file.span)) - }); + .or_else(|| resource.extension.clone().map(|e| (e, resource.span))); match type_option { Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) { - PolarsFileType::Csv | PolarsFileType::Tsv => { - from_csv(plugin, engine, call, &file_path, file_span) - } - PolarsFileType::Parquet => from_parquet(plugin, engine, call, &file_path, file_span), - PolarsFileType::Arrow => from_arrow(plugin, engine, call, &file_path, file_span), - PolarsFileType::Json => from_json(plugin, engine, call, &file_path, file_span), - PolarsFileType::NdJson => from_ndjson(plugin, engine, call, &file_path, file_span), - PolarsFileType::Avro => from_avro(plugin, engine, call, &file_path, file_span), + PolarsFileType::Csv | PolarsFileType::Tsv => from_csv(plugin, engine, call, resource), + PolarsFileType::Parquet => from_parquet(plugin, engine, call, resource), + PolarsFileType::Arrow => from_arrow(plugin, engine, call, resource), + PolarsFileType::Json => from_json(plugin, engine, call, resource), + PolarsFileType::NdJson => from_ndjson(plugin, engine, call, resource), + PolarsFileType::Avro => from_avro(plugin, engine, call, resource), _ => Err(PolarsFileType::build_unsupported_error( &ext, &[ @@ -192,14 +192,13 @@ fn from_parquet( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result { - - let cloud_options = crate::cloud::build_cloud_options(plugin, file_path)?; + let file_path = resource.path; + let file_span = resource.span; if !call.has_flag("eager")? { let file: String = call.req(0)?; - let mut args = ScanArgsParquet::default(); + let args = ScanArgsParquet::default(); let df: NuLazyFrame = LazyFrame::scan_parquet(file, args) .map_err(|e| ShellError::GenericError { error: "Parquet reader error".into(), @@ -247,11 +246,11 @@ fn from_avro( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result { + let file_path = resource.path; + let file_span = resource.span; let columns: Option> = call.get_flag("columns")?; - let r = File::open(file_path).map_err(|e| ShellError::GenericError { error: "Error opening file".into(), msg: e.to_string(), @@ -284,9 +283,10 @@ fn from_arrow( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result { + let file_path = resource.path; + let file_span = resource.span; if !call.has_flag("eager")? { let file: String = call.req(0)?; let args = ScanArgsIpc { @@ -346,9 +346,10 @@ fn from_json( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result { + let file_path = resource.path; + let file_span = resource.span; let file = File::open(file_path).map_err(|e| ShellError::GenericError { error: "Error opening file".into(), msg: e.to_string(), @@ -387,9 +388,10 @@ fn from_ndjson( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result { + let file_path = resource.path; + let file_span = resource.span; let infer_schema: NonZeroUsize = call .get_flag("infer-schema")? .and_then(NonZeroUsize::new) @@ -466,9 +468,10 @@ fn from_csv( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, - file_path: &Path, - file_span: Span, + resource: Resource, ) -> Result { + let file_path = resource.path; + let file_span = resource.span; let delimiter: Option> = call.get_flag("delimiter")?; let no_header: bool = call.has_flag("no-header")?; let infer_schema: usize = call @@ -555,7 +558,7 @@ fn from_csv( .with_encoding(CsvEncoding::LossyUtf8) .with_truncate_ragged_lines(truncate_ragged_lines) }) - .try_into_reader_with_file_path(Some(file_path.to_path_buf())) + .try_into_reader_with_file_path(Some(file_path.into())) .map_err(|e| ShellError::GenericError { error: "Error creating CSV reader".into(), msg: e.to_string(), diff --git a/crates/nu_plugin_polars/src/dataframe/values/file_type.rs b/crates/nu_plugin_polars/src/dataframe/values/file_type.rs index c46fcd7113..ec032c3bc5 100644 --- a/crates/nu_plugin_polars/src/dataframe/values/file_type.rs +++ b/crates/nu_plugin_polars/src/dataframe/values/file_type.rs @@ -1,5 +1,6 @@ use nu_protocol::{ShellError, Span}; +#[derive(Debug, Clone, PartialEq)] pub enum PolarsFileType { Csv, Tsv,