Added S3 support for polars save (#15005)

# Description
Parquet, CSV, NDJSON, and Arrow files can be written to AWS S3 via
`polars save`. This mirrors the s3 functionality provided by `polars
open`.

```nushell
ls | polars into-df | polars save s3://my-bucket/test.parquet
```

# User-Facing Changes
- S3 urls are now supported by `polars save`
This commit is contained in:
Jack Wright 2025-02-06 04:59:39 -08:00 committed by GitHub
parent 1a1a960836
commit 0705fb9cd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 207 additions and 150 deletions

View File

@ -3,6 +3,7 @@ mod columns;
mod fetch;
mod open;
mod profile;
mod resource;
mod save;
mod schema;
mod shape;
@ -12,11 +13,10 @@ mod to_lazy;
mod to_nu;
mod to_repr;
use crate::PolarsPlugin;
use nu_plugin::PluginCommand;
pub use self::open::OpenDataFrame;
use crate::PolarsPlugin;
use fetch::LazyFetch;
use nu_plugin::PluginCommand;
pub use schema::SchemaCmd;
pub use shape::ShapeDF;
pub use summary::Summary;

View File

@ -1,13 +1,11 @@
use crate::{
cloud::build_cloud_options,
command::core::resource::Resource,
dataframe::values::NuSchema,
values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType},
EngineWrapper, PolarsPlugin,
};
use log::debug;
use nu_path::expand_path_with;
use nu_utils::perf;
use url::Url;
use nu_plugin::PluginCommand;
use nu_protocol::{
@ -15,7 +13,7 @@ use nu_protocol::{
Span, Spanned, SyntaxShape, Type, Value,
};
use std::{fmt::Debug, fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc};
use std::{fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc};
use polars::{
lazy::frame::LazyJsonLineReader,
@ -25,7 +23,7 @@ use polars::{
},
};
use polars_io::{avro::AvroReader, cloud::CloudOptions, csv::read::CsvReadOptions, HiveOptions};
use polars_io::{avro::AvroReader, csv::read::CsvReadOptions, HiveOptions};
const DEFAULT_INFER_SCHEMA: usize = 100;
@ -116,61 +114,6 @@ impl PluginCommand for OpenDataFrame {
}
}
struct Resource {
path: String,
extension: Option<String>,
cloud_options: Option<CloudOptions>,
span: Span,
}
impl Debug for Resource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// We can't print out the cloud options as it might have
// secrets in it.. So just print whether or not it was defined
f.debug_struct("Resource")
.field("path", &self.path)
.field("extension", &self.extension)
.field("has_cloud_options", &self.cloud_options.is_some())
.field("span", &self.span)
.finish()
}
}
impl Resource {
fn new(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
spanned_path: &Spanned<String>,
) -> Result<Self, ShellError> {
let mut path = spanned_path.item.clone();
let (path_buf, cloud_options) = if let Ok(url) = path.parse::<Url>() {
let cloud_options =
build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError {
error: format!("Could not determine a supported cloud type from url: {url}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let p: PathBuf = url.path().into();
(p, Some(cloud_options))
} else {
let new_path = expand_path_with(path, engine.get_current_dir()?, true);
path = new_path.to_string_lossy().to_string();
(new_path, None)
};
let extension = path_buf
.extension()
.and_then(|s| s.to_str().map(|s| s.to_string()));
Ok(Self {
path,
extension,
cloud_options,
span: spanned_path.span,
})
}
}
fn command(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
@ -242,7 +185,7 @@ fn from_parquet(
let file_span = resource.span;
if !is_eager {
let args = ScanArgsParquet {
cloud_options: resource.cloud_options.clone(),
cloud_options: resource.cloud_options,
..Default::default()
};
let df: NuLazyFrame = LazyFrame::scan_parquet(file_path, args)
@ -345,7 +288,7 @@ fn from_arrow(
cache: true,
rechunk: false,
row_index: None,
cloud_options: resource.cloud_options.clone(),
cloud_options: resource.cloud_options,
include_file_paths: None,
hive_options: HiveOptions::default(),
};
@ -544,8 +487,7 @@ fn from_csv(
let truncate_ragged_lines: bool = call.has_flag("truncate-ragged-lines")?;
if !is_eager {
let csv_reader =
LazyCsvReader::new(file_path).with_cloud_options(resource.cloud_options.clone());
let csv_reader = LazyCsvReader::new(file_path).with_cloud_options(resource.cloud_options);
let csv_reader = match delimiter {
None => csv_reader,

View File

@ -0,0 +1,85 @@
use std::path::{Component, Path, PathBuf};
use crate::{cloud::build_cloud_options, PolarsPlugin};
use nu_path::expand_path_with;
use nu_protocol::{ShellError, Span, Spanned};
use polars_io::cloud::CloudOptions;
use url::Url;
pub(crate) struct Resource {
pub(crate) path: String,
pub(crate) extension: Option<String>,
pub(crate) cloud_options: Option<CloudOptions>,
pub(crate) span: Span,
}
impl std::fmt::Debug for Resource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// We can't print out the cloud options as it might have
// secrets in it.. So just print whether or not it was defined
f.debug_struct("Resource")
.field("path", &self.path)
.field("extension", &self.extension)
.field("has_cloud_options", &self.cloud_options.is_some())
.field("span", &self.span)
.finish()
}
}
impl Resource {
pub(crate) fn new(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
spanned_path: &Spanned<String>,
) -> Result<Self, ShellError> {
let mut path = spanned_path.item.clone();
let (path_buf, cloud_options) = match path.parse::<Url>() {
Ok(url) if !is_windows_path(&path) => {
let cloud_options =
build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError {
error: format!(
"Could not determine a supported cloud type from url: {url}"
),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let p: PathBuf = url.path().into();
(p, Some(cloud_options))
}
_ => {
let new_path = expand_path_with(path, engine.get_current_dir()?, true);
path = new_path.to_string_lossy().to_string();
(new_path, None)
}
};
let extension = path_buf
.extension()
.and_then(|s| s.to_str().map(|s| s.to_string()));
Ok(Self {
path,
extension,
cloud_options,
span: spanned_path.span,
})
}
}
// This is needed because Url parses windows paths as
// valid URLs.
fn is_windows_path(path: &str) -> bool {
// Only window spath will
if path.contains('\\') {
return true;
}
let path = Path::new(path);
match path.components().next() {
// This will only occur if their is a drive prefix
Some(Component::Prefix(_)) => true,
_ => false,
}
}

View File

@ -1,31 +1,36 @@
use std::{fs::File, path::Path};
use std::fs::File;
use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span};
use nu_protocol::ShellError;
use polars::prelude::{IpcWriter, SerWriter};
use polars_io::ipc::IpcWriterOptions;
use crate::values::{NuDataFrame, NuLazyFrame};
use crate::{
command::core::resource::Resource,
values::{NuDataFrame, NuLazyFrame},
};
use super::polars_file_save_error;
pub(crate) fn command_lazy(
_call: &EvaluatedCall,
lazy: &NuLazyFrame,
file_path: &Path,
file_span: Span,
resource: Resource,
) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
lazy.to_polars()
// todo - add cloud options
.sink_ipc(file_path, IpcWriterOptions::default(), None)
.sink_ipc(
file_path,
IpcWriterOptions::default(),
resource.cloud_options,
)
.map_err(|e| polars_file_save_error(e, file_span))
}
pub(crate) fn command_eager(
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
) -> Result<(), ShellError> {
pub(crate) fn command_eager(df: &NuDataFrame, resource: Resource) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: format!("Error with file name: {e}"),
msg: "".into(),

View File

@ -1,11 +1,11 @@
use std::fs::File;
use std::path::Path;
use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span};
use nu_protocol::ShellError;
use polars_io::avro::{AvroCompression, AvroWriter};
use polars_io::SerWriter;
use crate::command::core::resource::Resource;
use crate::values::NuDataFrame;
fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, ShellError> {
@ -31,9 +31,10 @@ fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, Shel
pub(crate) fn command_eager(
call: &EvaluatedCall,
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
resource: Resource,
) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let compression = get_compression(call)?;
let file = File::create(file_path).map_err(|e| ShellError::GenericError {

View File

@ -1,20 +1,24 @@
use std::{fs::File, path::Path};
use std::fs::File;
use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span, Spanned};
use nu_protocol::{ShellError, Spanned};
use polars::prelude::{CsvWriter, SerWriter};
use polars_io::csv::write::{CsvWriterOptions, SerializeOptions};
use crate::values::{NuDataFrame, NuLazyFrame};
use crate::{
command::core::resource::Resource,
values::{NuDataFrame, NuLazyFrame},
};
use super::polars_file_save_error;
pub(crate) fn command_lazy(
call: &EvaluatedCall,
lazy: &NuLazyFrame,
file_path: &Path,
file_span: Span,
resource: Resource,
) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let delimiter: Option<Spanned<String>> = call.get_flag("csv-delimiter")?;
let separator = delimiter
.and_then(|d| d.item.chars().next().map(|c| c as u8))
@ -32,17 +36,17 @@ pub(crate) fn command_lazy(
};
lazy.to_polars()
// todo - add cloud options
.sink_csv(file_path, options, None)
.sink_csv(file_path, options, resource.cloud_options)
.map_err(|e| polars_file_save_error(e, file_span))
}
pub(crate) fn command_eager(
call: &EvaluatedCall,
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
resource: Resource,
) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let delimiter: Option<Spanned<String>> = call.get_flag("csv-delimiter")?;
let no_header: bool = call.has_flag("csv-no-header")?;

View File

@ -7,11 +7,12 @@ mod parquet;
use std::path::PathBuf;
use crate::{
command::core::resource::Resource,
values::{cant_convert_err, PolarsFileType, PolarsPluginObject, PolarsPluginType},
PolarsPlugin,
};
use nu_path::expand_path_with;
use log::debug;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
shell_error::io::IoError, Category, Example, LabeledError, PipelineData, ShellError, Signature,
@ -35,7 +36,7 @@ impl PluginCommand for SaveDF {
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("path", SyntaxShape::Filepath, "Path to write to.")
.required("path", SyntaxShape::String, "Path or cloud url to write to")
.named(
"type",
SyntaxShape::String,
@ -127,70 +128,77 @@ impl PluginCommand for SaveDF {
}
fn command(
_plugin: &PolarsPlugin,
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
polars_object: PolarsPluginObject,
) -> Result<PipelineData, ShellError> {
let spanned_file: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
let file_span = spanned_file.span;
let spanned_file: Spanned<String> = call.req(0)?;
debug!("file: {}", spanned_file.item);
let resource = Resource::new(plugin, engine, &spanned_file)?;
let type_option: Option<(String, Span)> = call
.get_flag("type")?
.map(|t: Spanned<String>| (t.item, t.span))
.or_else(|| {
file_path
.extension()
.map(|e| (e.to_string_lossy().into_owned(), spanned_file.span))
});
.or_else(|| resource.extension.clone().map(|e| (e, resource.span)));
debug!("resource: {resource:?}");
match type_option {
Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) {
PolarsFileType::Parquet => match polars_object {
PolarsPluginObject::NuLazyFrame(ref lazy) => {
parquet::command_lazy(call, lazy, &file_path, file_span)
parquet::command_lazy(call, lazy, resource)
}
PolarsPluginObject::NuDataFrame(ref df) => {
parquet::command_eager(df, &file_path, file_span)
PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => {
parquet::command_lazy(call, &df.lazy(), resource)
}
_ => Err(unknown_file_save_error(file_span)),
PolarsPluginObject::NuDataFrame(ref df) => parquet::command_eager(df, resource),
_ => Err(unknown_file_save_error(resource.span)),
},
PolarsFileType::Arrow => match polars_object {
PolarsPluginObject::NuLazyFrame(ref lazy) => {
arrow::command_lazy(call, lazy, &file_path, file_span)
arrow::command_lazy(call, lazy, resource)
}
PolarsPluginObject::NuDataFrame(ref df) => {
arrow::command_eager(df, &file_path, file_span)
PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => {
arrow::command_lazy(call, &df.lazy(), resource)
}
_ => Err(unknown_file_save_error(file_span)),
PolarsPluginObject::NuDataFrame(ref df) => arrow::command_eager(df, resource),
_ => Err(unknown_file_save_error(resource.span)),
},
PolarsFileType::NdJson => match polars_object {
PolarsPluginObject::NuLazyFrame(ref lazy) => {
ndjson::command_lazy(call, lazy, &file_path, file_span)
ndjson::command_lazy(call, lazy, resource)
}
PolarsPluginObject::NuDataFrame(ref df) => {
ndjson::command_eager(df, &file_path, file_span)
PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => {
ndjson::command_lazy(call, &df.lazy(), resource)
}
_ => Err(unknown_file_save_error(file_span)),
PolarsPluginObject::NuDataFrame(ref df) => ndjson::command_eager(df, resource),
_ => Err(unknown_file_save_error(resource.span)),
},
PolarsFileType::Avro => match polars_object {
_ if resource.cloud_options.is_some() => Err(ShellError::GenericError {
error: "Cloud URLS are not supported with Avro".into(),
msg: "".into(),
span: call.get_flag_span("eager"),
help: Some("Remove flag".into()),
inner: vec![],
}),
PolarsPluginObject::NuLazyFrame(lazy) => {
let df = lazy.collect(call.head)?;
avro::command_eager(call, &df, &file_path, file_span)
avro::command_eager(call, &df, resource)
}
PolarsPluginObject::NuDataFrame(ref df) => {
avro::command_eager(call, df, &file_path, file_span)
}
_ => Err(unknown_file_save_error(file_span)),
PolarsPluginObject::NuDataFrame(ref df) => avro::command_eager(call, df, resource),
_ => Err(unknown_file_save_error(resource.span)),
},
PolarsFileType::Csv => match polars_object {
PolarsPluginObject::NuLazyFrame(ref lazy) => {
csv::command_lazy(call, lazy, &file_path, file_span)
csv::command_lazy(call, lazy, resource)
}
PolarsPluginObject::NuDataFrame(ref df) => {
csv::command_eager(call, df, &file_path, file_span)
PolarsPluginObject::NuDataFrame(ref df) if resource.cloud_options.is_some() => {
csv::command_lazy(call, &df.lazy(), resource)
}
_ => Err(unknown_file_save_error(file_span)),
PolarsPluginObject::NuDataFrame(ref df) => csv::command_eager(call, df, resource),
_ => Err(unknown_file_save_error(resource.span)),
},
_ => Err(PolarsFileType::build_unsupported_error(
&ext,
@ -206,8 +214,8 @@ fn command(
},
None => Err(ShellError::Io(IoError::new_with_additional_context(
std::io::ErrorKind::NotFound,
spanned_file.span,
spanned_file.item,
resource.span,
Some(PathBuf::from(resource.path)),
"File without extension",
))),
}?;

View File

@ -1,31 +1,36 @@
use std::{fs::File, io::BufWriter, path::Path};
use std::{fs::File, io::BufWriter};
use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span};
use nu_protocol::ShellError;
use polars::prelude::{JsonWriter, SerWriter};
use polars_io::json::JsonWriterOptions;
use crate::values::{NuDataFrame, NuLazyFrame};
use crate::{
command::core::resource::Resource,
values::{NuDataFrame, NuLazyFrame},
};
use super::polars_file_save_error;
pub(crate) fn command_lazy(
_call: &EvaluatedCall,
lazy: &NuLazyFrame,
file_path: &Path,
file_span: Span,
resource: Resource,
) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
lazy.to_polars()
// todo - add cloud options
.sink_json(file_path, JsonWriterOptions::default(), None)
.sink_json(
file_path,
JsonWriterOptions::default(),
resource.cloud_options,
)
.map_err(|e| polars_file_save_error(e, file_span))
}
pub(crate) fn command_eager(
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
) -> Result<(), ShellError> {
pub(crate) fn command_eager(df: &NuDataFrame, resource: Resource) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: format!("Error with file name: {e}"),
msg: "".into(),

View File

@ -1,31 +1,38 @@
use std::{fs::File, path::Path};
use std::fs::File;
use log::debug;
use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span};
use nu_protocol::ShellError;
use polars::prelude::ParquetWriter;
use polars_io::parquet::write::ParquetWriteOptions;
use crate::values::{NuDataFrame, NuLazyFrame};
use crate::{
command::core::resource::Resource,
values::{NuDataFrame, NuLazyFrame},
};
use super::polars_file_save_error;
pub(crate) fn command_lazy(
_call: &EvaluatedCall,
lazy: &NuLazyFrame,
file_path: &Path,
file_span: Span,
resource: Resource,
) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
debug!("Writing parquet file {file_path}");
lazy.to_polars()
// todo - add cloud options
.sink_parquet(&file_path, ParquetWriteOptions::default(), None)
.sink_parquet(
&file_path,
ParquetWriteOptions::default(),
resource.cloud_options,
)
.map_err(|e| polars_file_save_error(e, file_span))
}
pub(crate) fn command_eager(
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
) -> Result<(), ShellError> {
pub(crate) fn command_eager(df: &NuDataFrame, resource: Resource) -> Result<(), ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: "Error with file name".into(),
msg: e.to_string(),