diff --git a/crates/nu-command/src/database/commands/into_sqlite.rs b/crates/nu-command/src/database/commands/into_sqlite.rs index 49da38ec88..6a4ae6cf47 100644 --- a/crates/nu-command/src/database/commands/into_sqlite.rs +++ b/crates/nu-command/src/database/commands/into_sqlite.rs @@ -164,17 +164,23 @@ fn operate( let file_name: Spanned = call.req(engine_state, stack, 0)?; let table_name: Option> = call.get_flag(engine_state, stack, "table-name")?; let table = Table::new(&file_name, table_name)?; + let ctrl_c = engine_state.ctrlc.clone(); - match action(input, table, span) { + match action(input, table, span, ctrl_c) { Ok(val) => Ok(val.into_pipeline_data()), Err(e) => Err(e), } } -fn action(input: PipelineData, table: Table, span: Span) -> Result { +fn action( + input: PipelineData, + table: Table, + span: Span, + ctrl_c: Option>, +) -> Result { match input { PipelineData::ListStream(list_stream, _) => { - insert_in_transaction(list_stream.stream, list_stream.ctrlc, span, table) + insert_in_transaction(list_stream.stream, span, table, ctrl_c) } PipelineData::Value( Value::List { @@ -182,9 +188,9 @@ fn action(input: PipelineData, table: Table, span: Span) -> Result insert_in_transaction(vals.into_iter(), None, internal_span, table), + ) => insert_in_transaction(vals.into_iter(), internal_span, table, ctrl_c), PipelineData::Value(val, _) => { - insert_in_transaction(std::iter::once(val), None, span, table) + insert_in_transaction(std::iter::once(val), span, table, ctrl_c) } _ => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list".into(), @@ -197,9 +203,9 @@ fn action(input: PipelineData, table: Table, span: Span) -> Result, - ctrlc: Option>, span: Span, mut table: Table, + ctrl_c: Option>, ) -> Result { let mut stream = stream.peekable(); let first_val = match stream.peek() { @@ -210,10 +216,16 @@ fn insert_in_transaction( let table_name = table.name().clone(); let tx = table.try_init(first_val)?; - // insert all the records - stream.try_for_each(|stream_value| { - if let Some(ref ctrlc) = ctrlc { + for stream_value in stream { + if let Some(ref ctrlc) = ctrl_c { if ctrlc.load(Ordering::Relaxed) { + tx.rollback().map_err(|e| ShellError::GenericError { + error: "Failed to rollback SQLite transaction".into(), + msg: e.to_string(), + span: None, + help: None, + inner: Vec::new(), + })?; return Err(ShellError::InterruptedByUser { span: None }); } } @@ -249,8 +261,8 @@ fn insert_in_transaction( inner: Vec::new(), })?; - result - })?; + result? + } tx.commit().map_err(|e| ShellError::GenericError { error: "Failed to commit SQLite transaction".into(),