From b06f31d3c634a55cee5031f6a0daefb5f435a925 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Fri, 24 May 2024 16:37:50 -0700 Subject: [PATCH] Make `from json --objects` streaming (#12949) # Description Makes the `from json --objects` command produce a stream, and read lazily from an input stream to produce its output. Also added a helper, `PipelineData::get_type()`, to make it easier to construct a wrong type error message when matching on `PipelineData`. I expect checking `PipelineData` for either a string value or an `Unknown` or `String` typed `ByteStream` will be very, very common. I would have liked to have a helper that just returns a readable stream from either, but that would either be a bespoke enum or a `Box`, which feels like it wouldn't be so great for performance. So instead, taking the approach I did here is probably better - having a function that accepts the `impl BufRead` and matching to use it. # User-Facing Changes - `from json --objects` no longer collects its input, and can be used for large datasets or streams that produce values over time. # Tests + Formatting All passing. # After Submitting - [ ] release notes --------- Co-authored-by: Ian Manske --- crates/nu-command/src/formats/from/json.rs | 112 ++++++++++++------ .../tests/format_conversions/json.rs | 26 ++++ .../nu-protocol/src/pipeline/pipeline_data.rs | 20 +++- 3 files changed, 124 insertions(+), 34 deletions(-) diff --git a/crates/nu-command/src/formats/from/json.rs b/crates/nu-command/src/formats/from/json.rs index ea449711c1..e8b5d58e9e 100644 --- a/crates/nu-command/src/formats/from/json.rs +++ b/crates/nu-command/src/formats/from/json.rs @@ -1,4 +1,10 @@ +use std::{ + io::{BufRead, Cursor}, + sync::{atomic::AtomicBool, Arc}, +}; + use nu_engine::command_prelude::*; +use nu_protocol::ListStream; #[derive(Clone)] pub struct FromJson; @@ -45,6 +51,15 @@ impl Command for FromJson { "b" => Value::test_int(2), })), }, + Example { + example: r#"'{ "a": 1 } +{ "b": 2 }' | from json --objects"#, + description: "Parse a stream of line-delimited JSON values", + result: Some(Value::test_list(vec![ + Value::test_record(record! {"a" => Value::test_int(1)}), + Value::test_record(record! {"b" => Value::test_int(2)}), + ])), + }, ] } @@ -56,49 +71,80 @@ impl Command for FromJson { input: PipelineData, ) -> Result { let span = call.head; - let (string_input, span, metadata) = input.collect_string_strict(span)?; - - if string_input.is_empty() { - return Ok(Value::nothing(span).into_pipeline_data()); - } let strict = call.has_flag(engine_state, stack, "strict")?; // TODO: turn this into a structured underline of the nu_json error if call.has_flag(engine_state, stack, "objects")? { - let lines = string_input.lines().filter(|line| !line.trim().is_empty()); - - let converted_lines: Vec<_> = if strict { - lines - .map(|line| { - convert_string_to_value_strict(line, span) - .unwrap_or_else(|err| Value::error(err, span)) - }) - .collect() - } else { - lines - .map(|line| { - convert_string_to_value(line, span) - .unwrap_or_else(|err| Value::error(err, span)) - }) - .collect() - }; - - Ok(converted_lines.into_pipeline_data_with_metadata( - span, - engine_state.ctrlc.clone(), - metadata, - )) - } else if strict { - Ok(convert_string_to_value_strict(&string_input, span)? - .into_pipeline_data_with_metadata(metadata)) + // Return a stream of JSON values, one for each non-empty line + match input { + PipelineData::Value(Value::String { val, .. }, metadata) => { + Ok(PipelineData::ListStream( + read_json_lines(Cursor::new(val), span, strict, engine_state.ctrlc.clone()), + metadata, + )) + } + PipelineData::ByteStream(stream, metadata) + if stream.type_() != ByteStreamType::Binary => + { + if let Some(reader) = stream.reader() { + Ok(PipelineData::ListStream( + read_json_lines(reader, span, strict, None), + metadata, + )) + } else { + Ok(PipelineData::Empty) + } + } + _ => Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "string".into(), + wrong_type: input.get_type().to_string(), + dst_span: call.head, + src_span: input.span().unwrap_or(call.head), + }), + } } else { - Ok(convert_string_to_value(&string_input, span)? - .into_pipeline_data_with_metadata(metadata)) + // Return a single JSON value + let (string_input, span, metadata) = input.collect_string_strict(span)?; + + if string_input.is_empty() { + return Ok(Value::nothing(span).into_pipeline_data()); + } + + if strict { + Ok(convert_string_to_value_strict(&string_input, span)? + .into_pipeline_data_with_metadata(metadata)) + } else { + Ok(convert_string_to_value(&string_input, span)? + .into_pipeline_data_with_metadata(metadata)) + } } } } +/// Create a stream of values from a reader that produces line-delimited JSON +fn read_json_lines( + input: impl BufRead + Send + 'static, + span: Span, + strict: bool, + interrupt: Option>, +) -> ListStream { + let iter = input + .lines() + .filter(|line| line.as_ref().is_ok_and(|line| !line.trim().is_empty()) || line.is_err()) + .map(move |line| { + let line = line.err_span(span)?; + if strict { + convert_string_to_value_strict(&line, span) + } else { + convert_string_to_value(&line, span) + } + }) + .map(move |result| result.unwrap_or_else(|err| Value::error(err, span))); + + ListStream::new(iter, span, interrupt) +} + fn convert_nujson_to_value(value: nu_json::Value, span: Span) -> Value { match value { nu_json::Value::Array(array) => Value::list( diff --git a/crates/nu-command/tests/format_conversions/json.rs b/crates/nu-command/tests/format_conversions/json.rs index 1989000df6..643e4c3f22 100644 --- a/crates/nu-command/tests/format_conversions/json.rs +++ b/crates/nu-command/tests/format_conversions/json.rs @@ -96,6 +96,32 @@ fn from_json_text_recognizing_objects_independently_to_table() { }) } +#[test] +fn from_json_text_objects_is_stream() { + Playground::setup("filter_from_json_test_2_is_stream", |dirs, sandbox| { + sandbox.with_files(&[FileWithContentToBeTrimmed( + "katz.txt", + r#" + {"name": "Yehuda", "rusty_luck": 1} + {"name": "JT", "rusty_luck": 1} + {"name": "Andres", "rusty_luck": 1} + {"name":"GorbyPuff", "rusty_luck": 3} + "#, + )]); + + let actual = nu!( + cwd: dirs.test(), pipeline( + r#" + open katz.txt + | from json -o + | describe -n + "# + )); + + assert_eq!(actual.out, "stream"); + }) +} + #[test] fn from_json_text_recognizing_objects_independently_to_table_strict() { Playground::setup("filter_from_json_test_2_strict", |dirs, sandbox| { diff --git a/crates/nu-protocol/src/pipeline/pipeline_data.rs b/crates/nu-protocol/src/pipeline/pipeline_data.rs index 0a13ffa4b3..03fbc64d21 100644 --- a/crates/nu-protocol/src/pipeline/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline/pipeline_data.rs @@ -3,7 +3,7 @@ use crate::{ engine::{EngineState, Stack}, process::{ChildPipe, ChildProcess, ExitStatus}, ByteStream, ByteStreamType, Config, ErrSpan, ListStream, OutDest, PipelineMetadata, Range, - ShellError, Span, Value, + ShellError, Span, Type, Value, }; use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush}; use std::{ @@ -99,6 +99,24 @@ impl PipelineData { } } + /// Get a type that is representative of the `PipelineData`. + /// + /// The type returned here makes no effort to collect a stream, so it may be a different type + /// than would be returned by [`Value::get_type()`] on the result of [`.into_value()`]. + /// + /// Specifically, a `ListStream` results in [`list stream`](Type::ListStream) rather than + /// the fully complete [`list`](Type::List) type (which would require knowing the contents), + /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in + /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary). + pub fn get_type(&self) -> Type { + match self { + PipelineData::Empty => Type::Nothing, + PipelineData::Value(value, _) => value.get_type(), + PipelineData::ListStream(_, _) => Type::ListStream, + PipelineData::ByteStream(stream, _) => stream.type_().into(), + } + } + pub fn into_value(self, span: Span) -> Result { match self { PipelineData::Empty => Ok(Value::nothing(span)),