Working on S3/File resource

This commit is contained in:
Jack Wright
2024-12-18 18:23:07 -08:00
parent b7b4767f12
commit d8908eb205
5 changed files with 70 additions and 69 deletions

1
Cargo.lock generated
View File

@ -4245,6 +4245,7 @@ dependencies = [
"tempfile", "tempfile",
"tokio", "tokio",
"typetag", "typetag",
"url",
"uuid", "uuid",
] ]

View File

@ -49,6 +49,7 @@ aws-config = { version = "1.5", features = ["sso"] }
aws-credential-types = "1.2" aws-credential-types = "1.2"
tokio = { version = "1.41", features = ["full"] } tokio = { version = "1.41", features = ["full"] }
object_store = { version = "0.10", default-features = false } object_store = { version = "0.10", default-features = false }
url.workspace = true
[dependencies.polars] [dependencies.polars]
features = [ features = [

View File

@ -1,5 +1,6 @@
use nu_protocol::ShellError; use nu_protocol::ShellError;
use polars_io::cloud::CloudOptions; use polars_io::cloud::CloudOptions;
use url::Url;
use crate::PolarsPlugin; use crate::PolarsPlugin;
@ -9,24 +10,18 @@ enum CloudType {
Aws, Aws,
} }
fn determine_cloud_type(path: &str) -> Option<CloudType> { fn determine_cloud_type(url: &Url) -> Option<CloudType> {
if path.starts_with("s3://") | path.starts_with("s3a://") { match url.scheme() {
Some(CloudType::Aws) "s3" | "s3a" => Some(CloudType::Aws),
} else { _ => None,
None
} }
} }
/// Returns true if it is a supported cloud url
pub(crate) fn is_cloud_url(path: &str) ->bool {
determine_cloud_type(path).is_some()
}
pub(crate) fn build_cloud_options( pub(crate) fn build_cloud_options(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
path: &str, url: &Url,
) -> Result<Option<CloudOptions>, ShellError> { ) -> Result<Option<CloudOptions>, ShellError> {
match determine_cloud_type(path) { match determine_cloud_type(url) {
Some(CloudType::Aws) => aws::build_cloud_options(plugin).map(|c| Some(c)), Some(CloudType::Aws) => aws::build_cloud_options(plugin).map(|c| Some(c)),
_ => Ok(None), _ => Ok(None),
} }

View File

@ -1,10 +1,8 @@
use crate::{ use crate::{
dataframe::values::NuSchema, cloud::build_cloud_options, dataframe::values::NuSchema, values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType}, EngineWrapper, PolarsPlugin
values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType},
EngineWrapper, PolarsPlugin,
}; };
use nu_path::expand_path_with;
use nu_utils::perf; use nu_utils::perf;
use url::Url;
use nu_plugin::PluginCommand; use nu_plugin::PluginCommand;
use nu_protocol::{ use nu_protocol::{
@ -12,13 +10,7 @@ use nu_protocol::{
SyntaxShape, Type, Value, SyntaxShape, Type, Value,
}; };
use std::{ use std::{fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc};
fs::File,
io::BufReader,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::Arc,
};
use polars::{ use polars::{
lazy::frame::LazyJsonLineReader, lazy::frame::LazyJsonLineReader,
@ -119,19 +111,39 @@ impl PluginCommand for OpenDataFrame {
} }
} }
enum Resource { struct Resource {
File(PathBuf, Span), path: String,
CloudUrl(String, CloudOptions, Span) extension: Option<String>,
cloud_options: Option<CloudOptions>,
span: Span,
} }
impl Resource { impl Resource {
fn file_type(&self) ->Option<PolarsFileType> { fn new(plugin: &PolarsPlugin, spanned_path: &Spanned<String>) -> Result<Self, ShellError> {
let extension = match self { let (path_buf, cloud_options) = if let Ok(url) = spanned_path.item.parse::<Url>() {
Self::File(p, _) => p.extension().map(|s| s.to_string_lossy()), let cloud_options =
Self::CloudUrl(p, _, _) => p.ri build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError {
} error: format!("Could not determine a supported cloud type from url: {url}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let p: PathBuf = url.path().into();
(p, Some(cloud_options))
} else {
(PathBuf::from(&spanned_path.item), None)
};
let extension = path_buf
.extension()
.and_then(|s| s.to_str().map(|s| s.to_string()));
Ok(Self {
path: spanned_path.item.clone(),
extension,
cloud_options,
span: spanned_path.span,
})
} }
} }
fn command( fn command(
@ -141,32 +153,20 @@ fn command(
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let spanned_file: Spanned<String> = call.req(0)?; let spanned_file: Spanned<String> = call.req(0)?;
let resource = if let Some(cloud_options) = crate::cloud::build_cloud_options(plugin, &spanned_file.item)? { let resource = Resource::new(plugin, &spanned_file)?;
Resource::CloudUrl(spanned_file.item, cloud_options, spanned_file.span)
} else {
let path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
Resource::File(path, spanned_file.span)
};
let type_option: Option<(String, Span)> = call let type_option: Option<(String, Span)> = call
.get_flag("type")? .get_flag("type")?
.map(|t: Spanned<String>| (t.item, t.span)) .map(|t: Spanned<String>| (t.item, t.span))
.or_else(|| { .or_else(|| resource.extension.clone().map(|e| (e, resource.span)));
file_path
.extension()
.map(|e| (e.to_string_lossy().into_owned(), spanned_file.span))
});
match type_option { match type_option {
Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) { Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) {
PolarsFileType::Csv | PolarsFileType::Tsv => { PolarsFileType::Csv | PolarsFileType::Tsv => from_csv(plugin, engine, call, resource),
from_csv(plugin, engine, call, &file_path, file_span) PolarsFileType::Parquet => from_parquet(plugin, engine, call, resource),
} PolarsFileType::Arrow => from_arrow(plugin, engine, call, resource),
PolarsFileType::Parquet => from_parquet(plugin, engine, call, &file_path, file_span), PolarsFileType::Json => from_json(plugin, engine, call, resource),
PolarsFileType::Arrow => from_arrow(plugin, engine, call, &file_path, file_span), PolarsFileType::NdJson => from_ndjson(plugin, engine, call, resource),
PolarsFileType::Json => from_json(plugin, engine, call, &file_path, file_span), PolarsFileType::Avro => from_avro(plugin, engine, call, resource),
PolarsFileType::NdJson => from_ndjson(plugin, engine, call, &file_path, file_span),
PolarsFileType::Avro => from_avro(plugin, engine, call, &file_path, file_span),
_ => Err(PolarsFileType::build_unsupported_error( _ => Err(PolarsFileType::build_unsupported_error(
&ext, &ext,
&[ &[
@ -192,14 +192,13 @@ fn from_parquet(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface, engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
file_path: &Path, resource: Resource,
file_span: Span,
) -> Result<Value, ShellError> { ) -> Result<Value, ShellError> {
let file_path = resource.path;
let cloud_options = crate::cloud::build_cloud_options(plugin, file_path)?; let file_span = resource.span;
if !call.has_flag("eager")? { if !call.has_flag("eager")? {
let file: String = call.req(0)?; let file: String = call.req(0)?;
let mut args = ScanArgsParquet::default(); let args = ScanArgsParquet::default();
let df: NuLazyFrame = LazyFrame::scan_parquet(file, args) let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(), error: "Parquet reader error".into(),
@ -247,11 +246,11 @@ fn from_avro(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface, engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
file_path: &Path, resource: Resource,
file_span: Span,
) -> Result<Value, ShellError> { ) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let columns: Option<Vec<String>> = call.get_flag("columns")?; let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(file_path).map_err(|e| ShellError::GenericError { let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(), error: "Error opening file".into(),
msg: e.to_string(), msg: e.to_string(),
@ -284,9 +283,10 @@ fn from_arrow(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface, engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
file_path: &Path, resource: Resource,
file_span: Span,
) -> Result<Value, ShellError> { ) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
if !call.has_flag("eager")? { if !call.has_flag("eager")? {
let file: String = call.req(0)?; let file: String = call.req(0)?;
let args = ScanArgsIpc { let args = ScanArgsIpc {
@ -346,9 +346,10 @@ fn from_json(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface, engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
file_path: &Path, resource: Resource,
file_span: Span,
) -> Result<Value, ShellError> { ) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let file = File::open(file_path).map_err(|e| ShellError::GenericError { let file = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(), error: "Error opening file".into(),
msg: e.to_string(), msg: e.to_string(),
@ -387,9 +388,10 @@ fn from_ndjson(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface, engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
file_path: &Path, resource: Resource,
file_span: Span,
) -> Result<Value, ShellError> { ) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let infer_schema: NonZeroUsize = call let infer_schema: NonZeroUsize = call
.get_flag("infer-schema")? .get_flag("infer-schema")?
.and_then(NonZeroUsize::new) .and_then(NonZeroUsize::new)
@ -466,9 +468,10 @@ fn from_csv(
plugin: &PolarsPlugin, plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface, engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
file_path: &Path, resource: Resource,
file_span: Span,
) -> Result<Value, ShellError> { ) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?; let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?;
let no_header: bool = call.has_flag("no-header")?; let no_header: bool = call.has_flag("no-header")?;
let infer_schema: usize = call let infer_schema: usize = call
@ -555,7 +558,7 @@ fn from_csv(
.with_encoding(CsvEncoding::LossyUtf8) .with_encoding(CsvEncoding::LossyUtf8)
.with_truncate_ragged_lines(truncate_ragged_lines) .with_truncate_ragged_lines(truncate_ragged_lines)
}) })
.try_into_reader_with_file_path(Some(file_path.to_path_buf())) .try_into_reader_with_file_path(Some(file_path.into()))
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
error: "Error creating CSV reader".into(), error: "Error creating CSV reader".into(),
msg: e.to_string(), msg: e.to_string(),

View File

@ -1,5 +1,6 @@
use nu_protocol::{ShellError, Span}; use nu_protocol::{ShellError, Span};
#[derive(Debug, Clone, PartialEq)]
pub enum PolarsFileType { pub enum PolarsFileType {
Csv, Csv,
Tsv, Tsv,