Fix memory consumption of into sqlite (#10232)

# Description

Currently, the `into sqlite` command collects the entire input stream
into a single Value, which soaks up the entire input into memory, before
it ever tries to write anything to the DB. This is very problematic for
large inputs; for example, I tried transforming a multi-gigabyte CSV
file into SQLite, and before I knew what was happening, my system's
memory was completely exhausted, and I had to hard reboot to recover.

This PR fixes this problem by working directly with the pipeline stream,
inserting into the DB as values are read from the stream.

In order to facilitate working with the stream directly, I introduced a
new `Table` struct to store the connection and a few configuration
parameters, as well as to make it easier to lazily create the table on
the first read value.

In addition to the purely functional fixes, a few other changes were
made to the serialization and user facing behavior.

### Serialization

Much of the preexisting code was focused on generating the exact text
needed for a SQL statement. This is unneeded and less safe than using
the `rusqlite` crate's serialization for native Rust types along with
prepared statements.

### User-Facing Changes

Currently, the command is very liberal in the input types it accepts.
The strategy is basically if it is a record, try to follow its structure
and make an analogous SQL row, which is pretty reasonable. However, when
it's not a record, it basically tries to guess what the user wanted and
just makes a single column table and serializes the value into that one
column, whatever type it may be.

This has been changed so that it only accepts records as input. If the
user wants to serialize non-record types into SQL, then they must
explicitly opt into doing this by constructing a record or table with it
first. For a utility for inserting data into SQL, I think it makes more
sense to let the user choose how to convert their data, rather than make
a choice for them that may surprise them.

However, I understand this may be a controversial change. If the
maintainers don't agree, I can change this back.

#### Long switch names

The `file_name` and `table_name` long form switches are currently
snake_case and expect to be as such at the command line. These have been
changed to kebab-case to be more conventional.

# Tests + Formatting

To test the memory consumption, I used [this publicly available index of
all Wikipedia articles](https://dumps.wikimedia.org/enwiki/20230820/),
using the first 10,000, 100,000, and 1,000,000 entries, in that order. I
ran the following script to benchmark the changes against the current
stable release:

```nu
#!/usr/bin/nu

# let shellbin = $"($env.HOME)/src/nushell/target/aarch64-linux-android/release/nu"
let shellbin = "nu"
const dbpath = 'enwiki-index.db'

[10000, 100000, 1000000]
  | each {|rows|
      rm -f $dbpath;
      do { time -f '%M %e %U %S' $shellbin -c (
        $"bzip2 -cdk ~/enwiki-20230820-pages-articles-multistream-index.txt.bz2
            | head -n ($rows)
            | lines
            | parse '{offset}:{id}:{title}'
            | update cells -c [offset, id] { into int }
            | into sqlite ($dbpath)"
        )
      }
      | complete
      | get stderr
      | str trim
      | parse '{rss_max} {real} {user} {kernel}'
      | update cells -c [rss_max] { $"($in)kb" | into filesize }
      | update cells -c [real, user, kernel] { $"($in)sec" | into duration }
      | insert rows $rows
      | roll right
    }
  | flatten
  | to nuon
```

This yields the following results

Current stable release:

|rows|rss_max|real|user|kernel|
|-|-|-|-|-|
|10000|53.6 MiB|770ms|460ms|420ms|
|100000|209.6 MiB|6sec 940ms|3sec 740ms|4sec 380ms|
|1000000|1.7 GiB|1min 8sec 810ms|38sec 690ms|42sec 550ms|

This PR:

|rows|rss_max|real|user|kernel|
|-|-|-|-|-|
|10000|38.2 MiB|780ms|440ms|410ms|
|100000|39.8 MiB|6sec 450ms|3sec 530ms|4sec 160ms|
|1000000|39.8 MiB|1min 3sec 230ms|37sec 440ms|40sec 180ms|

# Note

I started this branch kind of at the same time as my others, but I
understand the feedback that smaller PRs are preferred. Let me know if
it would be better to split this up.

I do think the scope of the changes are on the bigger side even without
the behavior changes I mentioned, so I'm not sure if that will help this
particular PR very much, but I'm happy to oblige on request.
This commit is contained in:
Skyler Hawthorne 2024-01-15 22:41:25 -05:00 committed by GitHub
parent 924986576d
commit 7ac3e97bfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 687 additions and 285 deletions

139
Cargo.lock generated
View File

@ -114,9 +114,9 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.6.5"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6"
checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44"
dependencies = [
"anstyle",
"anstyle-parse",
@ -143,9 +143,9 @@ dependencies = [
[[package]]
name = "anstyle-query"
version = "1.0.2"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
checksum = "a3a318f1f38d2418400f8209655bfd825785afd25aa30bb7ba6cc792e4596748"
dependencies = [
"windows-sys 0.52.0",
]
@ -241,7 +241,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -252,7 +252,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -352,7 +352,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -471,7 +471,7 @@ checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -974,7 +974,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331"
dependencies = [
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -1196,7 +1196,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -1217,9 +1217,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "erased-serde"
version = "0.4.0"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3286168faae03a0e583f6fde17c02c8b8bba2dcc2061d0f7817066e5b0af706"
checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c"
dependencies = [
"serde",
]
@ -1451,7 +1451,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -1720,9 +1720,9 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.6"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
@ -1964,9 +1964,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.10"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "jobserver"
@ -2090,9 +2090,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.151"
version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "libflate"
@ -2406,7 +2406,7 @@ checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -2679,7 +2679,7 @@ dependencies = [
"percent-encoding",
"reedline",
"rstest",
"sysinfo 0.30.4",
"sysinfo 0.30.5",
"unicode-segmentation",
"uuid",
"which 5.0.0",
@ -2858,7 +2858,7 @@ dependencies = [
"serde_urlencoded",
"serde_yaml",
"sha2",
"sysinfo 0.30.4",
"sysinfo 0.30.5",
"tabled",
"terminal_size 0.3.0",
"titlecase",
@ -3042,7 +3042,7 @@ dependencies = [
"ntapi",
"once_cell",
"procfs",
"sysinfo 0.30.4",
"sysinfo 0.30.5",
"windows 0.52.0",
]
@ -3340,9 +3340,9 @@ checksum = "80adb31078122c880307e9cdfd4e3361e6545c319f9b9dcafcb03acd3b51a575"
[[package]]
name = "once_cell"
version = "1.19.0"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "oorandom"
@ -3384,7 +3384,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -3566,7 +3566,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -3649,7 +3649,7 @@ dependencies = [
"phf_shared 0.11.2",
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -4374,7 +4374,7 @@ dependencies = [
[[package]]
name = "reedline"
version = "0.28.0"
source = "git+https://github.com/nushell/reedline.git?branch=main#ef7b96c157f644f97907858ec6aea6d377e6639b"
source = "git+https://github.com/nushell/reedline.git?branch=main#dc27ed8ff4746386489dc25f70ea5aa613f540c0"
dependencies = [
"chrono",
"crossterm",
@ -4409,7 +4409,7 @@ checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -4533,7 +4533,7 @@ dependencies = [
"regex",
"relative-path",
"rustc_version",
"syn 2.0.40",
"syn 2.0.39",
"unicode-ident",
]
@ -4544,6 +4544,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2"
dependencies = [
"bitflags 2.4.1",
"chrono",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
@ -4571,7 +4572,7 @@ dependencies = [
"proc-macro2",
"quote",
"rust-embed-utils",
"syn 2.0.40",
"syn 2.0.39",
"walkdir",
]
@ -4628,9 +4629,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.28"
version = "0.38.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316"
checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a"
dependencies = [
"bitflags 2.4.1",
"errno",
@ -4647,9 +4648,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]]
name = "ryu"
version = "1.0.16"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "same-file"
@ -4761,7 +4762,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -4783,7 +4784,7 @@ checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -4842,7 +4843,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -5154,7 +5155,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -5219,9 +5220,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.40"
version = "2.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13fa70a4ee923979ffb522cacce59d34421ebdea5625e1073c4326ef9d2dd42e"
checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a"
dependencies = [
"proc-macro2",
"quote",
@ -5253,9 +5254,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.30.4"
version = "0.30.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "717570a2533606f81f8cfac02a1915a620e725ffb78f6fc5e259769a4d747407"
checksum = "1fb4f3438c8f6389c864e61221cbc97e9bca98b4daf39a5beb7bea660f528bb2"
dependencies = [
"cfg-if",
"core-foundation-sys",
@ -5371,7 +5372,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -5462,9 +5463,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.35.0"
version = "1.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c"
checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9"
dependencies = [
"backtrace",
"bytes",
@ -5487,7 +5488,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -5606,9 +5607,9 @@ dependencies = [
[[package]]
name = "try-lock"
version = "0.2.5"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "typed-arena"
@ -5624,9 +5625,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "typetag"
version = "0.2.14"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "196976efd4a62737b3a2b662cda76efb448d099b1049613d7a5d72743c611ce0"
checksum = "80960fd143d4c96275c0e60b08f14b81fbb468e79bc0ef8fbda69fb0afafae43"
dependencies = [
"erased-serde",
"inventory",
@ -5637,13 +5638,13 @@ dependencies = [
[[package]]
name = "typetag-impl"
version = "0.2.14"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eea6765137e2414c44c7b1e07c73965a118a72c46148e1e168b3fc9d3ccf3aa"
checksum = "bfc13d450dc4a695200da3074dacf43d449b968baee95e341920e47f61a3b40f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]
@ -5717,9 +5718,9 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "unsafe-libyaml"
version = "0.2.10"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b"
checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa"
[[package]]
name = "ureq"
@ -5974,7 +5975,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
"wasm-bindgen-shared",
]
@ -5996,7 +5997,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -6334,9 +6335,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
[[package]]
name = "winnow"
version = "0.5.28"
version = "0.5.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c830786f7720c2fd27a1a0e27a709dbd3c4d009b56d098fc742d4f4eab91fe2"
checksum = "b7e87b8dfbe3baffbe687eef2e164e32286eff31a5ee16463ce03d991643ec94"
dependencies = [
"memchr",
]
@ -6363,13 +6364,11 @@ dependencies = [
[[package]]
name = "xattr"
version = "1.1.3"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7dae5072fe1f8db8f8d29059189ac175196e410e40ba42d5d4684ae2f750995"
checksum = "f4686009f71ff3e5c4dbcf1a282d0a44db3f021ba69350cd42086b3e5f1c6985"
dependencies = [
"libc",
"linux-raw-sys",
"rustix",
]
[[package]]
@ -6392,22 +6391,22 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zerocopy"
version = "0.7.31"
version = "0.7.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
checksum = "5d075cf85bbb114e933343e087b92f2146bac0d55b534cbb8188becf0039948e"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.31"
version = "0.7.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a"
checksum = "86cd5ca076997b97ef09d3ad65efe811fa68c9e874cb636ccb211223a813b0c2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.39",
]
[[package]]

View File

@ -73,7 +73,7 @@ rand = "0.8"
rayon = "1.8"
regex = "1.9.5"
roxmltree = "0.18"
rusqlite = { version = "0.29", features = ["bundled", "backup"], optional = true }
rusqlite = { version = "0.29", features = ["bundled", "backup", "chrono"], optional = true }
same-file = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@ -1,13 +1,17 @@
use crate::database::values::sqlite::open_sqlite_db;
use itertools::Itertools;
use nu_engine::CallExt;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span, Spanned,
SyntaxShape, Type, Value,
Category, Example, IntoPipelineData, PipelineData, Record, ShellError, Signature, Span,
Spanned, SyntaxShape, Type, Value,
};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
pub const DEFAULT_TABLE_NAME: &str = "main";
#[derive(Clone)]
pub struct IntoSqliteDb;
@ -19,21 +23,23 @@ impl Command for IntoSqliteDb {
fn signature(&self) -> Signature {
Signature::build("into sqlite")
.input_output_types(vec![(Type::Any, Type::Nothing)])
.category(Category::Conversions)
.input_output_types(vec![
(Type::Table(vec![]), Type::Nothing),
(Type::Record(vec![]), Type::Nothing),
])
.allow_variants_without_examples(true)
// TODO: narrow disallowed types
.required(
"file_name",
"file-name",
SyntaxShape::String,
"Specify the filename to save the database to.",
)
.named(
"table_name",
"table-name",
SyntaxShape::String,
"Specify table name to store the data in",
Some('t'),
)
.category(Category::Conversions)
}
fn run(
@ -55,26 +61,96 @@ impl Command for IntoSqliteDb {
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Convert ls entries into a SQLite database with 'main' as the table name",
example: "ls | into sqlite my_ls.db",
result: None,
},
Example {
description: "Convert ls entries into a SQLite database with 'my_table' as the table name",
example: "ls | into sqlite my_ls.db -t my_table",
result: None,
},
Example {
description: "Convert table literal into a SQLite database with 'main' as the table name",
example: "[[name]; [-----] [someone] [=====] [somename] ['(((((']] | into sqlite filename.db",
result: None,
},
Example {
description: "Convert a variety of values in table literal form into a SQLite database",
example: "[one 2 5.2 six true 100mib 25sec] | into sqlite variety.db",
result: None,
}]
vec![
Example {
description: "Convert ls entries into a SQLite database with 'main' as the table name",
example: "ls | into sqlite my_ls.db",
result: None,
},
Example {
description: "Convert ls entries into a SQLite database with 'my_table' as the table name",
example: "ls | into sqlite my_ls.db -t my_table",
result: None,
},
Example {
description: "Convert table literal into a SQLite database with 'main' as the table name",
example: "[[name]; [-----] [someone] [=====] [somename] ['(((((']] | into sqlite filename.db",
result: None,
},
Example {
description: "Insert a single record into a SQLite database",
example: "{ foo: bar, baz: quux } | into sqlite filename.db",
result: None,
},
]
}
}
struct Table {
conn: rusqlite::Connection,
table_name: String,
}
impl Table {
pub fn new(
db_path: &Spanned<String>,
table_name: Option<Spanned<String>>,
) -> Result<Self, nu_protocol::ShellError> {
let table_name = if let Some(table_name) = table_name {
table_name.item
} else {
DEFAULT_TABLE_NAME.to_string()
};
// create the sqlite database table
let conn = open_sqlite_db(Path::new(&db_path.item), db_path.span)?;
Ok(Self { conn, table_name })
}
pub fn name(&self) -> &String {
&self.table_name
}
fn try_init(
&mut self,
record: &Record,
) -> Result<rusqlite::Transaction, nu_protocol::ShellError> {
let columns = get_columns_with_sqlite_types(record)?;
// create a string for sql table creation
let create_statement = format!(
"CREATE TABLE IF NOT EXISTS [{}] ({})",
self.table_name,
columns
.into_iter()
.map(|(col_name, sql_type)| format!("{col_name} {sql_type}"))
.collect::<Vec<_>>()
.join(", ")
);
// execute the statement
self.conn.execute(&create_statement, []).map_err(|err| {
eprintln!("{:?}", err);
ShellError::GenericError {
error: "Failed to create table".into(),
msg: err.to_string(),
span: None,
help: None,
inner: Vec::new(),
}
})?;
self.conn
.transaction()
.map_err(|err| ShellError::GenericError {
error: "Failed to open transaction".into(),
msg: err.to_string(),
span: None,
help: None,
inner: Vec::new(),
})
}
}
@ -87,180 +163,164 @@ fn operate(
let span = call.head;
let file_name: Spanned<String> = call.req(engine_state, stack, 0)?;
let table_name: Option<Spanned<String>> = call.get_flag(engine_state, stack, "table_name")?;
let table = Table::new(&file_name, table_name)?;
// collect the input into a value
let table_entries = input.into_value(span);
match action(&table_entries, table_name, file_name, span) {
match action(input, table, span) {
Ok(val) => Ok(val.into_pipeline_data()),
Err(e) => Err(e),
}
}
fn action(
input: &Value,
table: Option<Spanned<String>>,
file: Spanned<String>,
fn action(input: PipelineData, table: Table, span: Span) -> Result<Value, ShellError> {
match input {
PipelineData::ListStream(list_stream, _) => {
insert_in_transaction(list_stream.stream, list_stream.ctrlc, span, table)
}
PipelineData::Value(
Value::List {
vals,
internal_span,
},
_,
) => insert_in_transaction(vals.into_iter(), None, internal_span, table),
PipelineData::Value(val, _) => {
insert_in_transaction(std::iter::once(val), None, span, table)
}
_ => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list".into(),
wrong_type: "".into(),
dst_span: span,
src_span: span,
}),
}
}
fn insert_in_transaction(
stream: impl Iterator<Item = Value>,
ctrlc: Option<Arc<AtomicBool>>,
span: Span,
mut table: Table,
) -> Result<Value, ShellError> {
let table_name = if let Some(table_name) = table {
table_name.item
} else {
"main".to_string()
let mut stream = stream.peekable();
let first_val = match stream.peek() {
None => return Ok(Value::nothing(span)),
Some(val) => val.as_record()?,
};
let val_span = input.span();
match input {
Value::List { vals, .. } => {
// find the column names, and sqlite data types
let columns = get_columns_with_sqlite_types(vals);
let table_name = table.name().clone();
let tx = table.try_init(first_val)?;
let insert_statement = format!(
"INSERT INTO [{}] VALUES ({})",
table_name,
["?"].repeat(first_val.values().len()).join(", ")
);
let table_columns_creation = columns
.iter()
.map(|(name, sql_type)| format!("\"{name}\" {sql_type}"))
.join(",");
// get the values
let table_values = vals
.iter()
.map(|list_value| {
format!(
"({})",
match list_value {
Value::Record { val, .. } => {
val.values()
.map(|rec_val| {
format!("'{}'", nu_value_to_string(rec_val.clone(), ""))
})
.join(",")
}
// Number formats so keep them without quotes
Value::Int { .. }
| Value::Float { .. }
| Value::Filesize { .. }
| Value::Duration { .. } => nu_value_to_string(list_value.clone(), ""),
_ =>
// String formats so add quotes around them
format!("'{}'", nu_value_to_string(list_value.clone(), "")),
}
)
})
.join(",");
// create the sqlite database table
let conn = open_sqlite_db(Path::new(&file.item), file.span)?;
// create a string for sql table creation
let create_statement =
format!("CREATE TABLE IF NOT EXISTS [{table_name}] ({table_columns_creation})");
// prepare the string as a sqlite statement
let mut stmt =
conn.prepare(&create_statement)
.map_err(|e| ShellError::GenericError {
error: "Failed to prepare SQLite statement".into(),
msg: e.to_string(),
span: Some(file.span),
help: None,
inner: vec![],
})?;
// execute the statement
stmt.execute([]).map_err(|e| ShellError::GenericError {
error: "Failed to execute SQLite statement".into(),
let mut insert_statement =
tx.prepare(&insert_statement)
.map_err(|e| ShellError::GenericError {
error: "Failed to prepare SQLite statement".into(),
msg: e.to_string(),
span: Some(file.span),
span: None,
help: None,
inner: vec![],
inner: Vec::new(),
})?;
// use normal sql to create the table
// insert into table_name
// values
// ('xx', 'yy', 'zz'),
// ('aa', 'bb', 'cc'),
// ('dd', 'ee', 'ff')
// create the string for inserting data into the table
let insert_statement = format!("INSERT INTO [{table_name}] VALUES {table_values}");
// prepare the string as a sqlite statement
let mut stmt =
conn.prepare(&insert_statement)
.map_err(|e| ShellError::GenericError {
error: "Failed to prepare SQLite statement".into(),
msg: e.to_string(),
span: Some(file.span),
help: None,
inner: vec![],
})?;
// execute the statement
stmt.execute([]).map_err(|e| ShellError::GenericError {
error: "Failed to execute SQLite statement".into(),
msg: e.to_string(),
span: Some(file.span),
help: None,
inner: vec![],
})?;
// and we're done
Ok(Value::nothing(val_span))
// insert all the records
stream.try_for_each(|stream_value| {
if let Some(ref ctrlc) = ctrlc {
if ctrlc.load(Ordering::Relaxed) {
return Err(ShellError::InterruptedByUser { span: None });
}
}
// Propagate errors by explicitly matching them before the final case.
Value::Error { error, .. } => Err(*error.clone()),
other => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list".into(),
wrong_type: other.get_type().to_string(),
dst_span: span,
src_span: other.span(),
insert_value(stream_value, &mut insert_statement)
})?;
insert_statement
.finalize()
.map_err(|e| ShellError::GenericError {
error: "Failed to finalize SQLite prepared statement".into(),
msg: e.to_string(),
span: None,
help: None,
inner: Vec::new(),
})?;
tx.commit().map_err(|e| ShellError::GenericError {
error: "Failed to commit SQLite transaction".into(),
msg: e.to_string(),
span: None,
help: None,
inner: Vec::new(),
})?;
Ok(Value::nothing(span))
}
fn insert_value(
stream_value: Value,
insert_statement: &mut rusqlite::Statement<'_>,
) -> Result<(), ShellError> {
match stream_value {
// map each column value into its SQL representation
Value::Record { val, .. } => {
let sql_vals = values_to_sql(val.into_values())?;
insert_statement
.execute(rusqlite::params_from_iter(sql_vals))
.map_err(|e| ShellError::GenericError {
error: "Failed to execute SQLite statement".into(),
msg: e.to_string(),
span: None,
help: None,
inner: Vec::new(),
})?;
Ok(())
}
val => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: val.get_type().to_string(),
dst_span: Span::unknown(),
src_span: val.span(),
}),
}
}
// This is taken from to text local_into_string but tweaks it a bit so that certain formatting does not happen
fn nu_value_to_string(value: Value, separator: &str) -> String {
match value {
Value::Bool { val, .. } => val.to_string(),
Value::Int { val, .. } => val.to_string(),
Value::Float { val, .. } => val.to_string(),
Value::Filesize { val, .. } => val.to_string(),
Value::Duration { val, .. } => val.to_string(),
Value::Date { val, .. } => val.to_string(),
Value::Range { val, .. } => {
format!(
"{}..{}",
nu_value_to_string(val.from, ", "),
nu_value_to_string(val.to, ", ")
)
}
fn value_to_sql(value: Value) -> Result<Box<dyn rusqlite::ToSql>, ShellError> {
Ok(match value {
Value::Bool { val, .. } => Box::new(val),
Value::Int { val, .. } => Box::new(val),
Value::Float { val, .. } => Box::new(val),
Value::Filesize { val, .. } => Box::new(val),
Value::Duration { val, .. } => Box::new(val),
Value::Date { val, .. } => Box::new(val),
Value::String { val, .. } => {
// don't store ansi escape sequences in the database
// escape single quotes
nu_utils::strip_ansi_unlikely(&val).replace('\'', "''")
Box::new(nu_utils::strip_ansi_unlikely(&val).into_owned())
}
Value::List { vals: val, .. } => val
.into_iter()
.map(|x| nu_value_to_string(x, ", "))
.collect::<Vec<_>>()
.join(separator),
Value::Record { val, .. } => val
.into_iter()
.map(|(x, y)| format!("{}: {}", x, nu_value_to_string(y, ", ")))
.collect::<Vec<_>>()
.join(separator),
Value::LazyRecord { val, .. } => match val.collect() {
Ok(val) => nu_value_to_string(val, separator),
Err(error) => format!("{error:?}"),
},
Value::Block { val, .. } => format!("<Block {val}>"),
Value::Closure { val, .. } => format!("<Closure {}>", val.block_id),
Value::Nothing { .. } => String::new(),
Value::Error { error, .. } => format!("{error:?}"),
Value::Binary { val, .. } => format!("{val:?}"),
Value::CellPath { val, .. } => val.to_string(),
Value::CustomValue { val, .. } => val.value_string(),
}
Value::Binary { val, .. } => Box::new(val),
val => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type:
"bool, int, float, filesize, duration, date, string, nothing, binary".into(),
wrong_type: val.get_type().to_string(),
dst_span: Span::unknown(),
src_span: val.span(),
})
}
})
}
fn values_to_sql(
values: impl IntoIterator<Item = Value>,
) -> Result<Vec<Box<dyn rusqlite::ToSql>>, ShellError> {
values
.into_iter()
.map(value_to_sql)
.collect::<Result<Vec<_>, _>>()
}
// Each value stored in an SQLite database (or manipulated by the database engine) has one of the following storage classes:
@ -269,50 +329,52 @@ fn nu_value_to_string(value: Value, separator: &str) -> String {
// REAL. The value is a floating point value, stored as an 8-byte IEEE floating point number.
// TEXT. The value is a text string, stored using the database encoding (UTF-8, UTF-16BE or UTF-16LE).
// BLOB. The value is a blob of data, stored exactly as it was input.
fn nu_type_to_sqlite_type(nu_type: Type) -> &'static str {
match nu_type {
Type::Int => "INTEGER",
Type::Float => "REAL",
Type::String => "TEXT",
Type::Bool => "TEXT",
Type::Nothing => "NULL",
Type::Filesize => "INTEGER",
Type::Date => "TEXT",
_ => "TEXT",
fn nu_value_to_sqlite_type(val: &Value) -> Result<&'static str, ShellError> {
match val.get_type() {
Type::String => Ok("TEXT"),
Type::Int => Ok("INTEGER"),
Type::Float => Ok("REAL"),
Type::Number => Ok("REAL"),
Type::Binary => Ok("BLOB"),
Type::Bool => Ok("BOOLEAN"),
Type::Date => Ok("DATETIME"),
Type::Duration => Ok("BIGINT"),
Type::Filesize => Ok("INTEGER"),
// intentionally enumerated so that any future types get handled
Type::Any
| Type::Block
| Type::CellPath
| Type::Closure
| Type::Custom(_)
| Type::Error
| Type::List(_)
| Type::ListStream
| Type::Nothing
| Type::Range
| Type::Record(_)
| Type::Signature
| Type::Table(_) => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "sql".into(),
wrong_type: val.get_type().to_string(),
dst_span: Span::unknown(),
src_span: val.span(),
}),
}
}
fn get_columns_with_sqlite_types(input: &[Value]) -> Vec<(String, String)> {
let mut columns: Vec<(String, String)> = vec![];
let mut added = false;
fn get_columns_with_sqlite_types(
record: &Record,
) -> Result<Vec<(String, &'static str)>, ShellError> {
let mut columns: Vec<(String, &'static str)> = vec![];
for item in input {
// let sqlite_type = nu_type_to_sqlite_type(item.get_type());
// eprintln!(
// "item_type: {:?}, sqlite_type: {:?}",
// item.get_type(),
// sqlite_type
// );
if let Value::Record { val, .. } = item {
for (c, v) in val {
if !columns.iter().any(|(name, _)| name == c) {
columns.push((
c.to_string(),
nu_type_to_sqlite_type(v.get_type()).to_string(),
));
}
}
} else {
// force every other type to a string
if !added {
columns.push(("value".to_string(), "TEXT".to_string()));
added = true;
}
for (c, v) in record {
if !columns.iter().any(|(name, _)| name == c) {
columns.push((c.clone(), nu_value_to_sqlite_type(v)?));
}
}
columns
Ok(columns)
}
#[cfg(test)]

View File

@ -0,0 +1,339 @@
use std::{io::Write, path::PathBuf};
use chrono::{DateTime, FixedOffset, NaiveDateTime, Offset};
use nu_protocol::{ast::PathMember, Record, Span, Value};
use nu_test_support::{
fs::{line_ending, Stub},
nu, pipeline,
playground::{Dirs, Playground},
};
use rand::{
distributions::{Alphanumeric, DistString, Standard},
prelude::Distribution,
rngs::StdRng,
Rng, SeedableRng,
};
#[test]
fn into_sqlite_schema() {
Playground::setup("schema", |dirs, _| {
let testdb = make_sqlite_db(
&dirs,
r#"[
[somebool, someint, somefloat, somefilesize, someduration, somedate, somestring, somebinary];
[true, 1, 2.0, 1kb, 1sec, "2023-09-10 11:30:00", "foo", ("binary" | into binary)],
[false, 2, 3.0, 2mb, 4wk, "2020-09-10 12:30:00", "bar", ("wut" | into binary)],
]"#,
);
let conn = rusqlite::Connection::open(testdb).unwrap();
let mut stmt = conn.prepare("SELECT * FROM pragma_table_info(?1)").unwrap();
let actual_rows: Vec<_> = stmt
.query_and_then(["main"], |row| -> rusqlite::Result<_, rusqlite::Error> {
let name: String = row.get("name").unwrap();
let col_type: String = row.get("type").unwrap();
Ok((name, col_type))
})
.unwrap()
.map(|row| row.unwrap())
.collect();
let expected_rows = vec![
("somebool".into(), "BOOLEAN".into()),
("someint".into(), "INTEGER".into()),
("somefloat".into(), "REAL".into()),
("somefilesize".into(), "INTEGER".into()),
("someduration".into(), "BIGINT".into()),
("somedate".into(), "TEXT".into()),
("somestring".into(), "TEXT".into()),
("somebinary".into(), "BLOB".into()),
];
assert_eq!(expected_rows, actual_rows);
});
}
#[test]
fn into_sqlite_values() {
Playground::setup("values", |dirs, _| {
insert_test_rows(
&dirs,
r#"[
[somebool, someint, somefloat, somefilesize, someduration, somedate, somestring, somebinary];
[true, 1, 2.0, 1kb, 1sec, "2023-09-10T11:30:00-00:00", "foo", ("binary" | into binary)],
[false, 2, 3.0, 2mb, 4wk, "2020-09-10T12:30:00-00:00", "bar", ("wut" | into binary)],
]"#,
None,
vec![
TestRow(
true,
1,
2.0,
1000,
1000000000,
DateTime::parse_from_rfc3339("2023-09-10T11:30:00-00:00").unwrap(),
"foo".into(),
b"binary".to_vec(),
),
TestRow(
false,
2,
3.0,
2000000,
2419200000000000,
DateTime::parse_from_rfc3339("2020-09-10T12:30:00-00:00").unwrap(),
"bar".into(),
b"wut".to_vec(),
),
],
);
});
}
/// Opening a preexisting database should append to it
#[test]
fn into_sqlite_existing_db_append() {
Playground::setup("existing_db_append", |dirs, _| {
// create a new DB with only one row
insert_test_rows(
&dirs,
r#"[
[somebool, someint, somefloat, somefilesize, someduration, somedate, somestring, somebinary];
[true, 1, 2.0, 1kb, 1sec, "2023-09-10T11:30:00-00:00", "foo", ("binary" | into binary)],
]"#,
None,
vec![TestRow(
true,
1,
2.0,
1000,
1000000000,
DateTime::parse_from_rfc3339("2023-09-10T11:30:00-00:00").unwrap(),
"foo".into(),
b"binary".to_vec(),
)],
);
// open the same DB again and write one row
insert_test_rows(
&dirs,
r#"[
[somebool, someint, somefloat, somefilesize, someduration, somedate, somestring, somebinary];
[false, 2, 3.0, 2mb, 4wk, "2020-09-10T12:30:00-00:00", "bar", ("wut" | into binary)],
]"#,
None,
// it should have both rows
vec![
TestRow(
true,
1,
2.0,
1000,
1000000000,
DateTime::parse_from_rfc3339("2023-09-10T11:30:00-00:00").unwrap(),
"foo".into(),
b"binary".to_vec(),
),
TestRow(
false,
2,
3.0,
2000000,
2419200000000000,
DateTime::parse_from_rfc3339("2020-09-10T12:30:00-00:00").unwrap(),
"bar".into(),
b"wut".to_vec(),
),
],
);
});
}
/// Test inserting a good number of randomly generated rows to test an actual
/// streaming pipeline instead of a simple value
#[test]
fn into_sqlite_big_insert() {
Playground::setup("big_insert", |dirs, playground| {
const NUM_ROWS: usize = 10_000;
const NUON_FILE_NAME: &str = "data.nuon";
let nuon_path = dirs.test().join(NUON_FILE_NAME);
playground.with_files(vec![Stub::EmptyFile(&nuon_path.to_string_lossy())]);
let mut expected_rows = Vec::new();
let mut nuon_file = std::fs::OpenOptions::new()
.write(true)
.open(&nuon_path)
.unwrap();
// write the header
for row in std::iter::repeat_with(TestRow::random).take(NUM_ROWS) {
let mut value: Value = row.clone().into();
// HACK: Convert to a string to get around this: https://github.com/nushell/nushell/issues/9186
value
.upsert_cell_path(
&[PathMember::String {
val: "somedate".into(),
span: Span::unknown(),
optional: false,
}],
Box::new(|dateval| Value::string(dateval.as_string().unwrap(), dateval.span())),
)
.unwrap();
let nuon = nu_command::value_to_string(&value, Span::unknown(), 0, None).unwrap()
+ &line_ending();
nuon_file.write_all(nuon.as_bytes()).unwrap();
expected_rows.push(row);
}
insert_test_rows(
&dirs,
&format!(
"open --raw {} | lines | each {{ from nuon }}",
nuon_path.to_string_lossy()
),
None,
expected_rows,
);
});
}
/// empty in, empty out
#[test]
fn into_sqlite_empty() {
Playground::setup("empty", |dirs, _| {
insert_test_rows(&dirs, r#"[]"#, Some("SELECT * FROM sqlite_schema;"), vec![]);
});
}
#[derive(Debug, PartialEq, Clone)]
struct TestRow(
bool,
i64,
f64,
i64,
i64,
chrono::DateTime<chrono::FixedOffset>,
std::string::String,
std::vec::Vec<u8>,
);
impl TestRow {
pub fn random() -> Self {
StdRng::from_entropy().sample(Standard)
}
}
impl From<TestRow> for Value {
fn from(row: TestRow) -> Self {
Value::record(
Record::from_iter(vec![
("somebool".into(), Value::bool(row.0, Span::unknown())),
("someint".into(), Value::int(row.1, Span::unknown())),
("somefloat".into(), Value::float(row.2, Span::unknown())),
(
"somefilesize".into(),
Value::filesize(row.3, Span::unknown()),
),
(
"someduration".into(),
Value::duration(row.4, Span::unknown()),
),
("somedate".into(), Value::date(row.5, Span::unknown())),
("somestring".into(), Value::string(row.6, Span::unknown())),
("somebinary".into(), Value::binary(row.7, Span::unknown())),
]),
Span::unknown(),
)
}
}
impl<'r> TryFrom<&rusqlite::Row<'r>> for TestRow {
type Error = rusqlite::Error;
fn try_from(row: &rusqlite::Row) -> Result<Self, Self::Error> {
let somebool: bool = row.get("somebool").unwrap();
let someint: i64 = row.get("someint").unwrap();
let somefloat: f64 = row.get("somefloat").unwrap();
let somefilesize: i64 = row.get("somefilesize").unwrap();
let someduration: i64 = row.get("someduration").unwrap();
let somedate: DateTime<FixedOffset> = row.get("somedate").unwrap();
let somestring: String = row.get("somestring").unwrap();
let somebinary: Vec<u8> = row.get("somebinary").unwrap();
Ok(TestRow(
somebool,
someint,
somefloat,
somefilesize,
someduration,
somedate,
somestring,
somebinary,
))
}
}
impl Distribution<TestRow> for Standard {
fn sample<R>(&self, rng: &mut R) -> TestRow
where
R: rand::Rng + ?Sized,
{
let naive_dt =
NaiveDateTime::from_timestamp_millis(rng.gen_range(0..2324252554000)).unwrap();
let dt = DateTime::from_naive_utc_and_offset(naive_dt, chrono::Utc.fix());
let rand_string = Alphanumeric.sample_string(rng, 10);
// limit the size of the numbers to work around
// https://github.com/nushell/nushell/issues/10612
let filesize = rng.gen_range(-1024..=1024);
let duration = rng.gen_range(-1024..=1024);
TestRow(
rng.gen(),
rng.gen(),
rng.gen(),
filesize,
duration,
dt,
rand_string,
rng.gen::<u64>().to_be_bytes().to_vec(),
)
}
}
fn make_sqlite_db(dirs: &Dirs, nu_table: &str) -> PathBuf {
let testdir = dirs.test();
let testdb_path =
testdir.join(testdir.file_name().unwrap().to_str().unwrap().to_owned() + ".db");
let testdb = testdb_path.to_str().unwrap();
let nucmd = nu!(
cwd: testdir,
pipeline(&format!("{nu_table} | into sqlite {testdb}"))
);
assert!(nucmd.status.success());
testdb_path
}
fn insert_test_rows(dirs: &Dirs, nu_table: &str, sql_query: Option<&str>, expected: Vec<TestRow>) {
let sql_query = sql_query.unwrap_or("SELECT * FROM main;");
let testdb = make_sqlite_db(dirs, nu_table);
let conn = rusqlite::Connection::open(testdb).unwrap();
let mut stmt = conn.prepare(sql_query).unwrap();
let actual_rows: Vec<_> = stmt
.query_and_then([], |row| TestRow::try_from(row))
.unwrap()
.map(|row| row.unwrap())
.collect();
assert_eq!(expected, actual_rows);
}

View File

@ -0,0 +1 @@
mod into_sqlite;

View File

@ -13,6 +13,7 @@ mod config_nu_default;
mod continue_;
mod conversions;
mod cp;
mod database;
mod date;
mod debug_info;
mod def;