Wiring up s3 support and debugging

This commit is contained in:
Jack Wright
2024-12-20 17:03:15 -08:00
parent 96df4b9b92
commit 8d2ad29682
3 changed files with 40 additions and 11 deletions

5
Cargo.lock generated
View File

@ -5097,6 +5097,7 @@ dependencies = [
"blake3", "blake3",
"bytes", "bytes",
"chrono", "chrono",
"chrono-tz 0.8.6",
"fast-float", "fast-float",
"flate2", "flate2",
"fs4", "fs4",
@ -5142,6 +5143,7 @@ checksum = "d5c8c057ef04feaf34b6ce52096bdea3a766fa4725f50442078c8a4ee86397bf"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"chrono", "chrono",
"chrono-tz 0.8.6",
"fallible-streaming-iterator", "fallible-streaming-iterator",
"hashbrown 0.15.2", "hashbrown 0.15.2",
"indexmap", "indexmap",
@ -5163,6 +5165,7 @@ checksum = "4a8ca74f42e7b47cad241b36b98d991cc7fbb51b8d0695a055eb937588d1f310"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"bitflags 2.6.0", "bitflags 2.6.0",
"futures",
"memchr", "memchr",
"once_cell", "once_cell",
"polars-arrow", "polars-arrow",
@ -5178,6 +5181,7 @@ dependencies = [
"polars-time", "polars-time",
"polars-utils", "polars-utils",
"rayon", "rayon",
"tokio",
"version_check", "version_check",
] ]
@ -5303,6 +5307,7 @@ dependencies = [
"polars-row", "polars-row",
"polars-utils", "polars-utils",
"rayon", "rayon",
"tokio",
"uuid", "uuid",
"version_check", "version_check",
] ]

View File

@ -55,6 +55,7 @@ url.workspace = true
features = [ features = [
"arg_where", "arg_where",
"checked_arithmetic", "checked_arithmetic",
"cloud",
"concat_str", "concat_str",
"cross_join", "cross_join",
"csv", "csv",
@ -84,6 +85,7 @@ features = [
"strings", "strings",
"string_to_integer", "string_to_integer",
"streaming", "streaming",
"timezones",
"temporal", "temporal",
"to_dummies", "to_dummies",
] ]

View File

@ -4,6 +4,8 @@ use crate::{
values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType}, values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType},
EngineWrapper, PolarsPlugin, EngineWrapper, PolarsPlugin,
}; };
use log::debug;
use nu_path::expand_path_with;
use nu_utils::perf; use nu_utils::perf;
use url::Url; use url::Url;
@ -13,7 +15,7 @@ use nu_protocol::{
SyntaxShape, Type, Value, SyntaxShape, Type, Value,
}; };
use std::{fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc}; use std::{fmt::Debug, fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc};
use polars::{ use polars::{
lazy::frame::LazyJsonLineReader, lazy::frame::LazyJsonLineReader,
@ -45,7 +47,7 @@ impl PluginCommand for OpenDataFrame {
Signature::build(self.name()) Signature::build(self.name())
.required( .required(
"file", "file",
SyntaxShape::OneOf(vec![SyntaxShape::Filepath, SyntaxShape::String]), SyntaxShape::String,
"file path or cloud url to load values from", "file path or cloud url to load values from",
) )
.switch("eager", "Open dataframe as an eager dataframe", None) .switch("eager", "Open dataframe as an eager dataframe", None)
@ -121,9 +123,27 @@ struct Resource {
span: Span, span: Span,
} }
impl Debug for Resource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// We can't print out the cloud options as it might have
// secrets in it.. So just print whether or not it was defined
f.debug_struct("Resource")
.field("path", &self.path)
.field("extension", &self.extension)
.field("has_cloud_options", &self.cloud_options.is_some())
.field("span", &self.span)
.finish()
}
}
impl Resource { impl Resource {
fn new(plugin: &PolarsPlugin, spanned_path: &Spanned<String>) -> Result<Self, ShellError> { fn new(
let (path_buf, cloud_options) = if let Ok(url) = spanned_path.item.parse::<Url>() { plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
spanned_path: &Spanned<String>,
) -> Result<Self, ShellError> {
let mut path = spanned_path.item.clone();
let (path_buf, cloud_options) = if let Ok(url) = path.parse::<Url>() {
let cloud_options = let cloud_options =
build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError { build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError {
error: format!("Could not determine a supported cloud type from url: {url}"), error: format!("Could not determine a supported cloud type from url: {url}"),
@ -135,13 +155,15 @@ impl Resource {
let p: PathBuf = url.path().into(); let p: PathBuf = url.path().into();
(p, Some(cloud_options)) (p, Some(cloud_options))
} else { } else {
(PathBuf::from(&spanned_path.item), None) let new_path = expand_path_with(path, engine.get_current_dir()?, true);
path = new_path.to_string_lossy().to_string();
(new_path, None)
}; };
let extension = path_buf let extension = path_buf
.extension() .extension()
.and_then(|s| s.to_str().map(|s| s.to_string())); .and_then(|s| s.to_str().map(|s| s.to_string()));
Ok(Self { Ok(Self {
path: spanned_path.item.clone(), path,
extension, extension,
cloud_options, cloud_options,
span: spanned_path.span, span: spanned_path.span,
@ -155,12 +177,14 @@ fn command(
call: &nu_plugin::EvaluatedCall, call: &nu_plugin::EvaluatedCall,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let spanned_file: Spanned<String> = call.req(0)?; let spanned_file: Spanned<String> = call.req(0)?;
debug!("file: {}", spanned_file.item);
let resource = Resource::new(plugin, &spanned_file)?; let resource = Resource::new(plugin, engine, &spanned_file)?;
let type_option: Option<(String, Span)> = call let type_option: Option<(String, Span)> = call
.get_flag("type")? .get_flag("type")?
.map(|t: Spanned<String>| (t.item, t.span)) .map(|t: Spanned<String>| (t.item, t.span))
.or_else(|| resource.extension.clone().map(|e| (e, resource.span))); .or_else(|| resource.extension.clone().map(|e| (e, resource.span)));
debug!("resource: {resource:?}");
let is_eager = call.has_flag("eager")?; let is_eager = call.has_flag("eager")?;
@ -215,12 +239,11 @@ fn from_parquet(
let file_path = resource.path; let file_path = resource.path;
let file_span = resource.span; let file_span = resource.span;
if !is_eager { if !is_eager {
let file: String = call.req(0)?;
let args = ScanArgsParquet { let args = ScanArgsParquet {
cloud_options: resource.cloud_options.clone(), cloud_options: resource.cloud_options.clone(),
..Default::default() ..Default::default()
}; };
let df: NuLazyFrame = LazyFrame::scan_parquet(file, args) let df: NuLazyFrame = LazyFrame::scan_parquet(file_path, args)
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(), error: "Parquet reader error".into(),
msg: format!("{e:?}"), msg: format!("{e:?}"),
@ -311,7 +334,6 @@ fn from_arrow(
let file_path = resource.path; let file_path = resource.path;
let file_span = resource.span; let file_span = resource.span;
if !is_eager { if !is_eager {
let file: String = call.req(0)?;
let args = ScanArgsIpc { let args = ScanArgsIpc {
n_rows: None, n_rows: None,
cache: true, cache: true,
@ -322,7 +344,7 @@ fn from_arrow(
hive_options: HiveOptions::default(), hive_options: HiveOptions::default(),
}; };
let df: NuLazyFrame = LazyFrame::scan_ipc(file, args) let df: NuLazyFrame = LazyFrame::scan_ipc(file_path, args)
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
error: "IPC reader error".into(), error: "IPC reader error".into(),
msg: format!("{e:?}"), msg: format!("{e:?}"),