Polars AWS S3 support (#14648)

# Description

Provides Amazon S3 support.

- Utilizes your existing AWS cli configuration. 
- Supports AWS SSO
- Supports
[gimme-aws-creds](https://github.com/Nike-Inc/gimme-aws-creds).
- respects the settings of AWS_PROFILE environment variable for
selecting profile config
- AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION environment
variables for configuring without an AWS config

Usage:
```nushell
polars open s3://bucket/and/path.parquet
```

Supports:
- CSV
- Parquet
- NDJSON / json lines
- Arrow

Doesn't support:
- eager dataframes
-  Avro
- JSON
This commit is contained in:
Jack Wright
2024-12-25 04:15:50 -08:00
committed by GitHub
parent f2dcae570c
commit 23ba613b00
11 changed files with 843 additions and 103 deletions

View File

@ -89,7 +89,7 @@ mod test {
#[test]
fn test_remove() -> Result<(), ShellError> {
let plugin = PolarsPlugin::new_test_mode().into();
let plugin = PolarsPlugin::new_test_mode()?.into();
let pipeline_data = PluginTest::new("polars", plugin)?
.add_decl(Box::new(First))?
.add_decl(Box::new(Get))?

View File

@ -0,0 +1,67 @@
use std::error::Error;
use aws_config::{BehaviorVersion, SdkConfig};
use aws_credential_types::{provider::ProvideCredentials, Credentials};
use nu_protocol::ShellError;
use object_store::aws::AmazonS3ConfigKey;
use polars_io::cloud::CloudOptions;
use crate::PolarsPlugin;
async fn load_aws_config() -> SdkConfig {
aws_config::load_defaults(BehaviorVersion::latest()).await
}
async fn aws_creds(aws_config: &SdkConfig) -> Result<Option<Credentials>, ShellError> {
if let Some(provider) = aws_config.credentials_provider() {
Ok(Some(provider.provide_credentials().await.map_err(|e| {
ShellError::GenericError {
error: format!(
"Could not fetch AWS credentials: {} - {}",
e,
e.source()
.map(|e| format!("{}", e))
.unwrap_or("".to_string())
),
msg: "".into(),
span: None,
help: None,
inner: vec![],
}
})?))
} else {
Ok(None)
}
}
async fn build_aws_cloud_configs() -> Result<Vec<(AmazonS3ConfigKey, String)>, ShellError> {
let sdk_config = load_aws_config().await;
let creds = aws_creds(&sdk_config)
.await?
.ok_or(ShellError::GenericError {
error: "Could not determine AWS credentials".into(),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let mut configs = vec![
(AmazonS3ConfigKey::AccessKeyId, creds.access_key_id().into()),
(
AmazonS3ConfigKey::SecretAccessKey,
creds.secret_access_key().into(),
),
];
if let Some(token) = creds.session_token() {
configs.push((AmazonS3ConfigKey::Token, token.into()))
}
Ok(configs)
}
pub(crate) fn build_cloud_options(plugin: &PolarsPlugin) -> Result<CloudOptions, ShellError> {
let configs = plugin.runtime.block_on(build_aws_cloud_configs())?;
Ok(CloudOptions::default().with_aws(configs))
}

View File

@ -0,0 +1,28 @@
use nu_protocol::ShellError;
use polars_io::cloud::CloudOptions;
use url::Url;
use crate::PolarsPlugin;
mod aws;
enum CloudType {
Aws,
}
fn determine_cloud_type(url: &Url) -> Option<CloudType> {
match url.scheme() {
"s3" | "s3a" => Some(CloudType::Aws),
_ => None,
}
}
pub(crate) fn build_cloud_options(
plugin: &PolarsPlugin,
url: &Url,
) -> Result<Option<CloudOptions>, ShellError> {
match determine_cloud_type(url) {
Some(CloudType::Aws) => aws::build_cloud_options(plugin).map(Some),
_ => Ok(None),
}
}

View File

@ -1,10 +1,13 @@
use crate::{
cloud::build_cloud_options,
dataframe::values::NuSchema,
values::{CustomValueSupport, NuDataFrame, NuLazyFrame, PolarsFileType},
EngineWrapper, PolarsPlugin,
};
use log::debug;
use nu_path::expand_path_with;
use nu_utils::perf;
use url::Url;
use nu_plugin::PluginCommand;
use nu_protocol::{
@ -12,13 +15,7 @@ use nu_protocol::{
SyntaxShape, Type, Value,
};
use std::{
fs::File,
io::BufReader,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::Arc,
};
use std::{fmt::Debug, fs::File, io::BufReader, num::NonZeroUsize, path::PathBuf, sync::Arc};
use polars::{
lazy::frame::LazyJsonLineReader,
@ -28,7 +25,7 @@ use polars::{
},
};
use polars_io::{avro::AvroReader, csv::read::CsvReadOptions, HiveOptions};
use polars_io::{avro::AvroReader, cloud::CloudOptions, csv::read::CsvReadOptions, HiveOptions};
const DEFAULT_INFER_SCHEMA: usize = 100;
@ -50,8 +47,8 @@ impl PluginCommand for OpenDataFrame {
Signature::build(self.name())
.required(
"file",
SyntaxShape::Filepath,
"file path to load values from",
SyntaxShape::String,
"file path or cloud url to load values from",
)
.switch("eager", "Open dataframe as an eager dataframe", None)
.named(
@ -119,34 +116,98 @@ impl PluginCommand for OpenDataFrame {
}
}
struct Resource {
path: String,
extension: Option<String>,
cloud_options: Option<CloudOptions>,
span: Span,
}
impl Debug for Resource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// We can't print out the cloud options as it might have
// secrets in it.. So just print whether or not it was defined
f.debug_struct("Resource")
.field("path", &self.path)
.field("extension", &self.extension)
.field("has_cloud_options", &self.cloud_options.is_some())
.field("span", &self.span)
.finish()
}
}
impl Resource {
fn new(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
spanned_path: &Spanned<String>,
) -> Result<Self, ShellError> {
let mut path = spanned_path.item.clone();
let (path_buf, cloud_options) = if let Ok(url) = path.parse::<Url>() {
let cloud_options =
build_cloud_options(plugin, &url)?.ok_or(ShellError::GenericError {
error: format!("Could not determine a supported cloud type from url: {url}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let p: PathBuf = url.path().into();
(p, Some(cloud_options))
} else {
let new_path = expand_path_with(path, engine.get_current_dir()?, true);
path = new_path.to_string_lossy().to_string();
(new_path, None)
};
let extension = path_buf
.extension()
.and_then(|s| s.to_str().map(|s| s.to_string()));
Ok(Self {
path,
extension,
cloud_options,
span: spanned_path.span,
})
}
}
fn command(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
) -> Result<PipelineData, ShellError> {
let spanned_file: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
let file_span = spanned_file.span;
let spanned_file: Spanned<String> = call.req(0)?;
debug!("file: {}", spanned_file.item);
let resource = Resource::new(plugin, engine, &spanned_file)?;
let type_option: Option<(String, Span)> = call
.get_flag("type")?
.map(|t: Spanned<String>| (t.item, t.span))
.or_else(|| {
file_path
.extension()
.map(|e| (e.to_string_lossy().into_owned(), spanned_file.span))
.or_else(|| resource.extension.clone().map(|e| (e, resource.span)));
debug!("resource: {resource:?}");
let is_eager = call.has_flag("eager")?;
if is_eager && resource.cloud_options.is_some() {
return Err(ShellError::GenericError {
error: "Cloud URLs are not supported with --eager".into(),
msg: "".into(),
span: call.get_flag_span("eager"),
help: Some("Remove flag".into()),
inner: vec![],
});
}
match type_option {
Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) {
PolarsFileType::Csv | PolarsFileType::Tsv => {
from_csv(plugin, engine, call, &file_path, file_span)
from_csv(plugin, engine, call, resource, is_eager)
}
PolarsFileType::Parquet => from_parquet(plugin, engine, call, &file_path, file_span),
PolarsFileType::Arrow => from_arrow(plugin, engine, call, &file_path, file_span),
PolarsFileType::Json => from_json(plugin, engine, call, &file_path, file_span),
PolarsFileType::NdJson => from_ndjson(plugin, engine, call, &file_path, file_span),
PolarsFileType::Avro => from_avro(plugin, engine, call, &file_path, file_span),
PolarsFileType::Parquet => from_parquet(plugin, engine, call, resource, is_eager),
PolarsFileType::Arrow => from_arrow(plugin, engine, call, resource, is_eager),
PolarsFileType::Json => from_json(plugin, engine, call, resource, is_eager),
PolarsFileType::NdJson => from_ndjson(plugin, engine, call, resource, is_eager),
PolarsFileType::Avro => from_avro(plugin, engine, call, resource, is_eager),
_ => Err(PolarsFileType::build_unsupported_error(
&ext,
&[
@ -172,13 +233,17 @@ fn from_parquet(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
resource: Resource,
is_eager: bool,
) -> Result<Value, ShellError> {
if !call.has_flag("eager")? {
let file: String = call.req(0)?;
let args = ScanArgsParquet::default();
let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
let file_path = resource.path;
let file_span = resource.span;
if !is_eager {
let args = ScanArgsParquet {
cloud_options: resource.cloud_options.clone(),
..Default::default()
};
let df: NuLazyFrame = LazyFrame::scan_parquet(file_path, args)
.map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
msg: format!("{e:?}"),
@ -225,11 +290,16 @@ fn from_avro(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
resource: Resource,
_is_eager: bool, // ignore, lazy frames are not currently supported
) -> Result<Value, ShellError> {
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let file_path = resource.path;
let file_span = resource.span;
if resource.cloud_options.is_some() {
return Err(cloud_not_supported(PolarsFileType::Avro, file_span));
}
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
@ -262,22 +332,23 @@ fn from_arrow(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
resource: Resource,
is_eager: bool,
) -> Result<Value, ShellError> {
if !call.has_flag("eager")? {
let file: String = call.req(0)?;
let file_path = resource.path;
let file_span = resource.span;
if !is_eager {
let args = ScanArgsIpc {
n_rows: None,
cache: true,
rechunk: false,
row_index: None,
cloud_options: None,
cloud_options: resource.cloud_options.clone(),
include_file_paths: None,
hive_options: HiveOptions::default(),
};
let df: NuLazyFrame = LazyFrame::scan_ipc(file, args)
let df: NuLazyFrame = LazyFrame::scan_ipc(file_path, args)
.map_err(|e| ShellError::GenericError {
error: "IPC reader error".into(),
msg: format!("{e:?}"),
@ -324,9 +395,14 @@ fn from_json(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
resource: Resource,
_is_eager: bool, // ignore = lazy frames not currently supported
) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
if resource.cloud_options.is_some() {
return Err(cloud_not_supported(PolarsFileType::Json, file_span));
}
let file = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
@ -365,9 +441,11 @@ fn from_ndjson(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
resource: Resource,
is_eager: bool,
) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let infer_schema: NonZeroUsize = call
.get_flag("infer-schema")?
.and_then(NonZeroUsize::new)
@ -380,12 +458,13 @@ fn from_ndjson(
.map(|schema| NuSchema::try_from(&schema))
.transpose()?;
if !call.has_flag("eager")? {
if !is_eager {
let start_time = std::time::Instant::now();
let df = LazyJsonLineReader::new(file_path)
.with_infer_schema_length(Some(infer_schema))
.with_schema(maybe_schema.map(|s| s.into()))
.with_cloud_options(resource.cloud_options.clone())
.finish()
.map_err(|e| ShellError::GenericError {
error: format!("NDJSON reader error: {e}"),
@ -444,9 +523,11 @@ fn from_csv(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
resource: Resource,
is_eager: bool,
) -> Result<Value, ShellError> {
let file_path = resource.path;
let file_span = resource.span;
let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?;
let no_header: bool = call.has_flag("no-header")?;
let infer_schema: usize = call
@ -460,8 +541,9 @@ fn from_csv(
.transpose()?;
let truncate_ragged_lines: bool = call.has_flag("truncate-ragged-lines")?;
if !call.has_flag("eager")? {
let csv_reader = LazyCsvReader::new(file_path);
if !is_eager {
let csv_reader =
LazyCsvReader::new(file_path).with_cloud_options(resource.cloud_options.clone());
let csv_reader = match delimiter {
None => csv_reader,
@ -533,7 +615,7 @@ fn from_csv(
.with_encoding(CsvEncoding::LossyUtf8)
.with_truncate_ragged_lines(truncate_ragged_lines)
})
.try_into_reader_with_file_path(Some(file_path.to_path_buf()))
.try_into_reader_with_file_path(Some(file_path.into()))
.map_err(|e| ShellError::GenericError {
error: "Error creating CSV reader".into(),
msg: e.to_string(),
@ -556,3 +638,16 @@ fn from_csv(
df.cache_and_to_value(plugin, engine, call.head)
}
}
fn cloud_not_supported(file_type: PolarsFileType, span: Span) -> ShellError {
ShellError::GenericError {
error: format!(
"Cloud operations not supported for file type {}",
file_type.to_str()
),
msg: "".into(),
span: Some(span),
help: None,
inner: vec![],
}
}

View File

@ -248,7 +248,7 @@ pub(crate) mod test {
let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
let cmd = format!("{cmd} {tmp_file_str}");
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::new()?.into())?;
plugin_test.engine_state_mut().add_env_var(
"PWD".to_string(),
Value::string(

View File

@ -79,7 +79,7 @@ mod tests {
#[test]
fn test_to_lazy() -> Result<(), ShellError> {
let plugin: Arc<PolarsPlugin> = PolarsPlugin::new_test_mode().into();
let plugin: Arc<PolarsPlugin> = PolarsPlugin::new_test_mode()?.into();
let mut plugin_test = PluginTest::new("polars", Arc::clone(&plugin))?;
let pipeline_data = plugin_test.eval("[[a b]; [6 2] [1 4] [4 1]] | polars into-lazy")?;
let value = pipeline_data.into_value(Span::test_data())?;

View File

@ -1,5 +1,6 @@
use nu_protocol::{ShellError, Span};
#[derive(Debug, Clone, PartialEq)]
pub enum PolarsFileType {
Csv,
Tsv,
@ -23,9 +24,12 @@ impl PolarsFileType {
.collect::<Vec<&'static str>>()
.join(", ");
ShellError::FileNotFoundCustom {
msg: format!("Unsupported type {extension} expected {type_string}"),
span,
ShellError::GenericError {
error: format!("Unsupported type {extension} expected {type_string}"),
msg: "".into(),
span: Some(span),
help: None,
inner: vec![],
}
}

View File

@ -14,9 +14,11 @@ use log::debug;
use nu_plugin::{EngineInterface, Plugin, PluginCommand};
mod cache;
mod cloud;
pub mod dataframe;
pub use dataframe::*;
use nu_protocol::{ast::Operator, CustomValue, LabeledError, ShellError, Span, Spanned, Value};
use tokio::runtime::Runtime;
use values::CustomValueType;
use crate::values::PolarsPluginCustomValue;
@ -52,11 +54,27 @@ impl EngineWrapper for &EngineInterface {
}
}
#[derive(Default)]
pub struct PolarsPlugin {
pub(crate) cache: Cache,
/// For testing purposes only
pub(crate) disable_cache_drop: bool,
pub(crate) runtime: Runtime,
}
impl PolarsPlugin {
pub fn new() -> Result<Self, ShellError> {
Ok(Self {
cache: Cache::default(),
disable_cache_drop: false,
runtime: Runtime::new().map_err(|e| ShellError::GenericError {
error: format!("Could not instantiate tokio: {e}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?,
})
}
}
impl Plugin for PolarsPlugin {
@ -237,11 +255,11 @@ pub mod test {
impl PolarsPlugin {
/// Creates a new polars plugin in test mode
pub fn new_test_mode() -> Self {
PolarsPlugin {
pub fn new_test_mode() -> Result<Self, ShellError> {
Ok(PolarsPlugin {
disable_cache_drop: true,
..PolarsPlugin::default()
}
..PolarsPlugin::new()?
})
}
}
@ -269,7 +287,7 @@ pub mod test {
command: &impl PluginCommand,
decls: Vec<Box<dyn Command>>,
) -> Result<(), ShellError> {
let plugin = PolarsPlugin::new_test_mode();
let plugin = PolarsPlugin::new_test_mode()?;
let examples = command.examples();
// we need to cache values in the examples

View File

@ -6,5 +6,12 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn main() {
env_logger::init();
serve_plugin(&PolarsPlugin::default(), MsgPackSerializer {})
match PolarsPlugin::new() {
Ok(ref plugin) => serve_plugin(plugin, MsgPackSerializer {}),
Err(e) => {
eprintln!("{}", e);
std::process::exit(1);
}
}
}