Make from json --objects streaming (#12949)

# Description

Makes the `from json --objects` command produce a stream, and read
lazily from an input stream to produce its output.

Also added a helper, `PipelineData::get_type()`, to make it easier to
construct a wrong type error message when matching on `PipelineData`. I
expect checking `PipelineData` for either a string value or an `Unknown`
or `String` typed `ByteStream` will be very, very common. I would have
liked to have a helper that just returns a readable stream from either,
but that would either be a bespoke enum or a `Box<dyn BufRead>`, which
feels like it wouldn't be so great for performance. So instead, taking
the approach I did here is probably better - having a function that
accepts the `impl BufRead` and matching to use it.

# User-Facing Changes

- `from json --objects` no longer collects its input, and can be used
for large datasets or streams that produce values over time.

# Tests + Formatting
All passing.

# After Submitting
- [ ] release notes

---------

Co-authored-by: Ian Manske <ian.manske@pm.me>
This commit is contained in:
Devyn Cairns 2024-05-24 16:37:50 -07:00 committed by GitHub
parent 84b7a99adf
commit b06f31d3c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 124 additions and 34 deletions

View File

@ -1,4 +1,10 @@
use std::{
io::{BufRead, Cursor},
sync::{atomic::AtomicBool, Arc},
};
use nu_engine::command_prelude::*;
use nu_protocol::ListStream;
#[derive(Clone)]
pub struct FromJson;
@ -45,6 +51,15 @@ impl Command for FromJson {
"b" => Value::test_int(2),
})),
},
Example {
example: r#"'{ "a": 1 }
{ "b": 2 }' | from json --objects"#,
description: "Parse a stream of line-delimited JSON values",
result: Some(Value::test_list(vec![
Value::test_record(record! {"a" => Value::test_int(1)}),
Value::test_record(record! {"b" => Value::test_int(2)}),
])),
},
]
}
@ -56,40 +71,47 @@ impl Command for FromJson {
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let strict = call.has_flag(engine_state, stack, "strict")?;
// TODO: turn this into a structured underline of the nu_json error
if call.has_flag(engine_state, stack, "objects")? {
// Return a stream of JSON values, one for each non-empty line
match input {
PipelineData::Value(Value::String { val, .. }, metadata) => {
Ok(PipelineData::ListStream(
read_json_lines(Cursor::new(val), span, strict, engine_state.ctrlc.clone()),
metadata,
))
}
PipelineData::ByteStream(stream, metadata)
if stream.type_() != ByteStreamType::Binary =>
{
if let Some(reader) = stream.reader() {
Ok(PipelineData::ListStream(
read_json_lines(reader, span, strict, None),
metadata,
))
} else {
Ok(PipelineData::Empty)
}
}
_ => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "string".into(),
wrong_type: input.get_type().to_string(),
dst_span: call.head,
src_span: input.span().unwrap_or(call.head),
}),
}
} else {
// Return a single JSON value
let (string_input, span, metadata) = input.collect_string_strict(span)?;
if string_input.is_empty() {
return Ok(Value::nothing(span).into_pipeline_data());
}
let strict = call.has_flag(engine_state, stack, "strict")?;
// TODO: turn this into a structured underline of the nu_json error
if call.has_flag(engine_state, stack, "objects")? {
let lines = string_input.lines().filter(|line| !line.trim().is_empty());
let converted_lines: Vec<_> = if strict {
lines
.map(|line| {
convert_string_to_value_strict(line, span)
.unwrap_or_else(|err| Value::error(err, span))
})
.collect()
} else {
lines
.map(|line| {
convert_string_to_value(line, span)
.unwrap_or_else(|err| Value::error(err, span))
})
.collect()
};
Ok(converted_lines.into_pipeline_data_with_metadata(
span,
engine_state.ctrlc.clone(),
metadata,
))
} else if strict {
if strict {
Ok(convert_string_to_value_strict(&string_input, span)?
.into_pipeline_data_with_metadata(metadata))
} else {
@ -97,6 +119,30 @@ impl Command for FromJson {
.into_pipeline_data_with_metadata(metadata))
}
}
}
}
/// Create a stream of values from a reader that produces line-delimited JSON
fn read_json_lines(
input: impl BufRead + Send + 'static,
span: Span,
strict: bool,
interrupt: Option<Arc<AtomicBool>>,
) -> ListStream {
let iter = input
.lines()
.filter(|line| line.as_ref().is_ok_and(|line| !line.trim().is_empty()) || line.is_err())
.map(move |line| {
let line = line.err_span(span)?;
if strict {
convert_string_to_value_strict(&line, span)
} else {
convert_string_to_value(&line, span)
}
})
.map(move |result| result.unwrap_or_else(|err| Value::error(err, span)));
ListStream::new(iter, span, interrupt)
}
fn convert_nujson_to_value(value: nu_json::Value, span: Span) -> Value {

View File

@ -96,6 +96,32 @@ fn from_json_text_recognizing_objects_independently_to_table() {
})
}
#[test]
fn from_json_text_objects_is_stream() {
Playground::setup("filter_from_json_test_2_is_stream", |dirs, sandbox| {
sandbox.with_files(&[FileWithContentToBeTrimmed(
"katz.txt",
r#"
{"name": "Yehuda", "rusty_luck": 1}
{"name": "JT", "rusty_luck": 1}
{"name": "Andres", "rusty_luck": 1}
{"name":"GorbyPuff", "rusty_luck": 3}
"#,
)]);
let actual = nu!(
cwd: dirs.test(), pipeline(
r#"
open katz.txt
| from json -o
| describe -n
"#
));
assert_eq!(actual.out, "stream");
})
}
#[test]
fn from_json_text_recognizing_objects_independently_to_table_strict() {
Playground::setup("filter_from_json_test_2_strict", |dirs, sandbox| {

View File

@ -3,7 +3,7 @@ use crate::{
engine::{EngineState, Stack},
process::{ChildPipe, ChildProcess, ExitStatus},
ByteStream, ByteStreamType, Config, ErrSpan, ListStream, OutDest, PipelineMetadata, Range,
ShellError, Span, Value,
ShellError, Span, Type, Value,
};
use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush};
use std::{
@ -99,6 +99,24 @@ impl PipelineData {
}
}
/// Get a type that is representative of the `PipelineData`.
///
/// The type returned here makes no effort to collect a stream, so it may be a different type
/// than would be returned by [`Value::get_type()`] on the result of [`.into_value()`].
///
/// Specifically, a `ListStream` results in [`list stream`](Type::ListStream) rather than
/// the fully complete [`list`](Type::List) type (which would require knowing the contents),
/// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
/// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
pub fn get_type(&self) -> Type {
match self {
PipelineData::Empty => Type::Nothing,
PipelineData::Value(value, _) => value.get_type(),
PipelineData::ListStream(_, _) => Type::ListStream,
PipelineData::ByteStream(stream, _) => stream.type_().into(),
}
}
pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
match self {
PipelineData::Empty => Ok(Value::nothing(span)),