Stream support (#812)

* Moves off of draining between filters. Instead, the sink will pull on the stream, and will drain element-wise. This moves the whole stream to being lazy.
* Adds ctrl-c support and connects it into some of the key points where we pull on the stream. If a ctrl-c is detect, we immediately halt pulling on the stream and return to the prompt.
* Moves away from having a SourceMap where anchor locations are stored. Now AnchorLocation is kept directly in the Tag.
* To make this possible, split tag and span. Span is largely used in the parser and is copyable. Tag is now no longer copyable.
This commit is contained in:
Jonathan Turner
2019-10-13 17:12:43 +13:00
committed by GitHub
parent 8ca678440a
commit 193b00764b
110 changed files with 1988 additions and 1892 deletions

View File

@ -3,7 +3,6 @@ use crate::commands::cp::CopyArgs;
use crate::commands::mkdir::MkdirArgs;
use crate::commands::mv::MoveArgs;
use crate::commands::rm::RemoveArgs;
use crate::context::SourceMap;
use crate::data::dir_entry_dict;
use crate::prelude::*;
use crate::shell::completer::NuCompleter;
@ -12,6 +11,7 @@ use crate::utils::FileStructure;
use rustyline::completion::FilenameCompleter;
use rustyline::hint::{Hinter, HistoryHinter};
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
pub struct FilesystemShell {
pub(crate) path: String,
@ -73,7 +73,7 @@ impl FilesystemShell {
}
impl Shell for FilesystemShell {
fn name(&self, _source_map: &SourceMap) -> String {
fn name(&self) -> String {
"filesystem".to_string()
}
@ -84,7 +84,7 @@ impl Shell for FilesystemShell {
fn ls(
&self,
pattern: Option<Tagged<PathBuf>>,
command_tag: Tag,
context: &RunnableContext,
) -> Result<OutputStream, ShellError> {
let cwd = self.path();
let mut full_path = PathBuf::from(self.path());
@ -94,7 +94,8 @@ impl Shell for FilesystemShell {
_ => {}
}
let mut shell_entries = VecDeque::new();
let ctrl_c = context.ctrl_c.clone();
let name_tag = context.name.clone();
//If it's not a glob, try to display the contents of the entry if it's a directory
let lossy_path = full_path.to_string_lossy();
@ -114,24 +115,30 @@ impl Shell for FilesystemShell {
return Err(ShellError::labeled_error(
e.to_string(),
e.to_string(),
command_tag,
name_tag,
));
}
}
Ok(o) => o,
};
for entry in entries {
let entry = entry?;
let filepath = entry.path();
let filename = if let Ok(fname) = filepath.strip_prefix(&cwd) {
fname
} else {
Path::new(&filepath)
};
let value = dir_entry_dict(filename, &entry.metadata()?, command_tag)?;
shell_entries.push_back(ReturnSuccess::value(value))
}
return Ok(shell_entries.to_output_stream());
let stream = async_stream! {
for entry in entries {
if ctrl_c.load(Ordering::SeqCst) {
break;
}
if let Ok(entry) = entry {
let filepath = entry.path();
let filename = if let Ok(fname) = filepath.strip_prefix(&cwd) {
fname
} else {
Path::new(&filepath)
};
let value = dir_entry_dict(filename, &entry.metadata().unwrap(), &name_tag)?;
yield ReturnSuccess::value(value);
}
}
};
return Ok(stream.to_output_stream());
}
}
@ -151,20 +158,25 @@ impl Shell for FilesystemShell {
};
// Enumerate the entries from the glob and add each
for entry in entries {
if let Ok(entry) = entry {
let filename = if let Ok(fname) = entry.strip_prefix(&cwd) {
fname
} else {
Path::new(&entry)
};
let metadata = std::fs::metadata(&entry)?;
let value = dir_entry_dict(filename, &metadata, command_tag)?;
shell_entries.push_back(ReturnSuccess::value(value))
let stream = async_stream! {
for entry in entries {
if ctrl_c.load(Ordering::SeqCst) {
break;
}
if let Ok(entry) = entry {
let filename = if let Ok(fname) = entry.strip_prefix(&cwd) {
fname
} else {
Path::new(&entry)
};
let metadata = std::fs::metadata(&entry).unwrap();
if let Ok(value) = dir_entry_dict(filename, &metadata, &name_tag) {
yield ReturnSuccess::value(value);
}
}
}
}
Ok(shell_entries.to_output_stream())
};
Ok(stream.to_output_stream())
}
fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
@ -175,7 +187,7 @@ impl Shell for FilesystemShell {
return Err(ShellError::labeled_error(
"Can not change to home directory",
"can not go to home",
args.call_info.name_tag,
&args.call_info.name_tag,
))
}
},
@ -957,7 +969,7 @@ impl Shell for FilesystemShell {
return Err(ShellError::labeled_error(
"unable to show current directory",
"pwd command failed",
args.call_info.name_tag,
&args.call_info.name_tag,
));
}
};
@ -965,7 +977,7 @@ impl Shell for FilesystemShell {
let mut stream = VecDeque::new();
stream.push_back(ReturnSuccess::value(
Value::Primitive(Primitive::String(p.to_string_lossy().to_string()))
.tagged(args.call_info.name_tag),
.tagged(&args.call_info.name_tag),
));
Ok(stream.into())

View File

@ -3,7 +3,6 @@ use crate::commands::cp::CopyArgs;
use crate::commands::mkdir::MkdirArgs;
use crate::commands::mv::MoveArgs;
use crate::commands::rm::RemoveArgs;
use crate::context::SourceMap;
use crate::data::{command_dict, TaggedDictBuilder};
use crate::prelude::*;
use crate::shell::shell::Shell;
@ -98,8 +97,8 @@ impl HelpShell {
}
impl Shell for HelpShell {
fn name(&self, source_map: &SourceMap) -> String {
let anchor_name = self.value.anchor_name(source_map);
fn name(&self) -> String {
let anchor_name = self.value.anchor_name();
format!(
"{}",
match anchor_name {
@ -129,7 +128,7 @@ impl Shell for HelpShell {
fn ls(
&self,
_pattern: Option<Tagged<PathBuf>>,
_command_tag: Tag,
_context: &RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(self
.commands()

View File

@ -3,7 +3,7 @@ use crate::parser::hir::syntax_shape::{color_fallible_syntax, FlatShape, Pipelin
use crate::parser::hir::TokensIterator;
use crate::parser::nom_input;
use crate::parser::parse::token_tree::TokenNode;
use crate::{Tag, Tagged, TaggedItem, Text};
use crate::{Span, Spanned, SpannedItem, Tag, Tagged, Text};
use ansi_term::Color;
use log::trace;
use rustyline::completion::Completer;
@ -67,7 +67,7 @@ impl Highlighter for Helper {
}
fn highlight<'l>(&self, line: &'l str, _pos: usize) -> Cow<'l, str> {
let tokens = crate::parser::pipeline(nom_input(line, uuid::Uuid::nil()));
let tokens = crate::parser::pipeline(nom_input(line));
match tokens {
Err(_) => Cow::Borrowed(line),
@ -78,13 +78,13 @@ impl Highlighter for Helper {
Ok(v) => v,
};
let tokens = vec![TokenNode::Pipeline(pipeline.clone().tagged(v.tag()))];
let mut tokens = TokensIterator::all(&tokens[..], v.tag());
let tokens = vec![TokenNode::Pipeline(pipeline.clone().spanned(v.span()))];
let mut tokens = TokensIterator::all(&tokens[..], v.span());
let text = Text::from(line);
let expand_context = self
.context
.expand_context(&text, Tag::from((0, line.len() - 1, uuid::Uuid::nil())));
.expand_context(&text, Span::new(0, line.len() - 1));
let mut shapes = vec![];
// We just constructed a token list that only contains a pipeline, so it can't fail
@ -126,16 +126,16 @@ impl Highlighter for Helper {
#[allow(unused)]
fn vec_tag<T>(input: Vec<Tagged<T>>) -> Option<Tag> {
let mut iter = input.iter();
let first = iter.next()?.tag;
let first = iter.next()?.tag.clone();
let last = iter.last();
Some(match last {
None => first,
Some(last) => first.until(last.tag),
Some(last) => first.until(&last.tag),
})
}
fn paint_flat_shape(flat_shape: Tagged<FlatShape>, line: &str) -> String {
fn paint_flat_shape(flat_shape: Spanned<FlatShape>, line: &str) -> String {
let style = match &flat_shape.item {
FlatShape::OpenDelimiter(_) => Color::White.normal(),
FlatShape::CloseDelimiter(_) => Color::White.normal(),
@ -170,7 +170,7 @@ fn paint_flat_shape(flat_shape: Tagged<FlatShape>, line: &str) -> String {
}
};
let body = flat_shape.tag.slice(line);
let body = flat_shape.span.slice(line);
style.paint(body).to_string()
}

View File

@ -3,20 +3,19 @@ use crate::commands::cp::CopyArgs;
use crate::commands::mkdir::MkdirArgs;
use crate::commands::mv::MoveArgs;
use crate::commands::rm::RemoveArgs;
use crate::context::SourceMap;
use crate::errors::ShellError;
use crate::prelude::*;
use crate::stream::OutputStream;
use std::path::PathBuf;
pub trait Shell: std::fmt::Debug {
fn name(&self, source_map: &SourceMap) -> String;
fn name(&self) -> String;
fn homedir(&self) -> Option<PathBuf>;
fn ls(
&self,
pattern: Option<Tagged<PathBuf>>,
command_tag: Tag,
context: &RunnableContext,
) -> Result<OutputStream, ShellError>;
fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError>;
fn cp(&self, args: CopyArgs, name: Tag, path: &str) -> Result<OutputStream, ShellError>;

View File

@ -10,18 +10,19 @@ use crate::shell::shell::Shell;
use crate::stream::OutputStream;
use std::error::Error;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct ShellManager {
pub(crate) current_shell: usize,
pub(crate) current_shell: Arc<AtomicUsize>,
pub(crate) shells: Arc<Mutex<Vec<Box<dyn Shell + Send>>>>,
}
impl ShellManager {
pub fn basic(commands: CommandRegistry) -> Result<ShellManager, Box<dyn Error>> {
Ok(ShellManager {
current_shell: 0,
current_shell: Arc::new(AtomicUsize::new(0)),
shells: Arc::new(Mutex::new(vec![Box::new(FilesystemShell::basic(
commands,
)?)])),
@ -30,24 +31,29 @@ impl ShellManager {
pub fn insert_at_current(&mut self, shell: Box<dyn Shell + Send>) {
self.shells.lock().unwrap().push(shell);
self.current_shell = self.shells.lock().unwrap().len() - 1;
self.current_shell
.store(self.shells.lock().unwrap().len() - 1, Ordering::SeqCst);
self.set_path(self.path());
}
pub fn current_shell(&self) -> usize {
self.current_shell.load(Ordering::SeqCst)
}
pub fn remove_at_current(&mut self) {
{
let mut shells = self.shells.lock().unwrap();
if shells.len() > 0 {
if self.current_shell == shells.len() - 1 {
if self.current_shell() == shells.len() - 1 {
shells.pop();
let new_len = shells.len();
if new_len > 0 {
self.current_shell = new_len - 1;
self.current_shell.store(new_len - 1, Ordering::SeqCst);
} else {
return;
}
} else {
shells.remove(self.current_shell);
shells.remove(self.current_shell());
}
}
}
@ -59,17 +65,17 @@ impl ShellManager {
}
pub fn path(&self) -> String {
self.shells.lock().unwrap()[self.current_shell].path()
self.shells.lock().unwrap()[self.current_shell()].path()
}
pub fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
let env = self.shells.lock().unwrap();
env[self.current_shell].pwd(args)
env[self.current_shell()].pwd(args)
}
pub fn set_path(&mut self, path: String) {
self.shells.lock().unwrap()[self.current_shell].set_path(path)
self.shells.lock().unwrap()[self.current_shell()].set_path(path)
}
pub fn complete(
@ -78,20 +84,21 @@ impl ShellManager {
pos: usize,
ctx: &rustyline::Context<'_>,
) -> Result<(usize, Vec<rustyline::completion::Pair>), rustyline::error::ReadlineError> {
self.shells.lock().unwrap()[self.current_shell].complete(line, pos, ctx)
self.shells.lock().unwrap()[self.current_shell()].complete(line, pos, ctx)
}
pub fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<String> {
self.shells.lock().unwrap()[self.current_shell].hint(line, pos, ctx)
self.shells.lock().unwrap()[self.current_shell()].hint(line, pos, ctx)
}
pub fn next(&mut self) {
{
let shell_len = self.shells.lock().unwrap().len();
if self.current_shell == (shell_len - 1) {
self.current_shell = 0;
if self.current_shell() == (shell_len - 1) {
self.current_shell.store(0, Ordering::SeqCst);
} else {
self.current_shell += 1;
self.current_shell
.store(self.current_shell() + 1, Ordering::SeqCst);
}
}
self.set_path(self.path());
@ -100,10 +107,11 @@ impl ShellManager {
pub fn prev(&mut self) {
{
let shell_len = self.shells.lock().unwrap().len();
if self.current_shell == 0 {
self.current_shell = shell_len - 1;
if self.current_shell() == 0 {
self.current_shell.store(shell_len - 1, Ordering::SeqCst);
} else {
self.current_shell -= 1;
self.current_shell
.store(self.current_shell() - 1, Ordering::SeqCst);
}
}
self.set_path(self.path());
@ -112,23 +120,23 @@ impl ShellManager {
pub fn homedir(&self) -> Option<PathBuf> {
let env = self.shells.lock().unwrap();
env[self.current_shell].homedir()
env[self.current_shell()].homedir()
}
pub fn ls(
&self,
path: Option<Tagged<PathBuf>>,
command_tag: Tag,
context: &RunnableContext,
) -> Result<OutputStream, ShellError> {
let env = self.shells.lock().unwrap();
env[self.current_shell].ls(path, command_tag)
env[self.current_shell()].ls(path, context)
}
pub fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
let env = self.shells.lock().unwrap();
env[self.current_shell].cd(args)
env[self.current_shell()].cd(args)
}
pub fn cp(
@ -140,13 +148,13 @@ impl ShellManager {
match env {
Ok(x) => {
let path = x[self.current_shell].path();
x[self.current_shell].cp(args, context.name, &path)
let path = x[self.current_shell()].path();
x[self.current_shell()].cp(args, context.name.clone(), &path)
}
Err(e) => Err(ShellError::labeled_error(
format!("Internal error: could not lock {}", e),
"Internal error: could not lock",
context.name,
&context.name,
)),
}
}
@ -160,13 +168,13 @@ impl ShellManager {
match env {
Ok(x) => {
let path = x[self.current_shell].path();
x[self.current_shell].rm(args, context.name, &path)
let path = x[self.current_shell()].path();
x[self.current_shell()].rm(args, context.name.clone(), &path)
}
Err(e) => Err(ShellError::labeled_error(
format!("Internal error: could not lock {}", e),
"Internal error: could not lock",
context.name,
&context.name,
)),
}
}
@ -180,13 +188,13 @@ impl ShellManager {
match env {
Ok(x) => {
let path = x[self.current_shell].path();
x[self.current_shell].mkdir(args, context.name, &path)
let path = x[self.current_shell()].path();
x[self.current_shell()].mkdir(args, context.name.clone(), &path)
}
Err(e) => Err(ShellError::labeled_error(
format!("Internal error: could not lock {}", e),
"Internal error: could not lock",
context.name,
&context.name,
)),
}
}
@ -200,13 +208,13 @@ impl ShellManager {
match env {
Ok(x) => {
let path = x[self.current_shell].path();
x[self.current_shell].mv(args, context.name, &path)
let path = x[self.current_shell()].path();
x[self.current_shell()].mv(args, context.name.clone(), &path)
}
Err(e) => Err(ShellError::labeled_error(
format!("Internal error: could not lock {}", e),
"Internal error: could not lock",
context.name,
&context.name,
)),
}
}

View File

@ -3,7 +3,6 @@ use crate::commands::cp::CopyArgs;
use crate::commands::mkdir::MkdirArgs;
use crate::commands::mv::MoveArgs;
use crate::commands::rm::RemoveArgs;
use crate::context::SourceMap;
use crate::prelude::*;
use crate::shell::shell::Shell;
use crate::utils::ValueStructure;
@ -72,8 +71,8 @@ impl ValueShell {
}
impl Shell for ValueShell {
fn name(&self, source_map: &SourceMap) -> String {
let anchor_name = self.value.anchor_name(source_map);
fn name(&self) -> String {
let anchor_name = self.value.anchor_name();
format!(
"{}",
match anchor_name {
@ -90,9 +89,10 @@ impl Shell for ValueShell {
fn ls(
&self,
target: Option<Tagged<PathBuf>>,
command_name: Tag,
context: &RunnableContext,
) -> Result<OutputStream, ShellError> {
let mut full_path = PathBuf::from(self.path());
let name_tag = context.name.clone();
match &target {
Some(value) => full_path.push(value.as_ref()),
@ -114,7 +114,7 @@ impl Shell for ValueShell {
return Err(ShellError::labeled_error(
"Can not list entries inside",
"No such path exists",
command_name,
name_tag,
));
}
@ -166,7 +166,7 @@ impl Shell for ValueShell {
return Err(ShellError::labeled_error(
"Can not change to path inside",
"No such path exists",
args.call_info.name_tag,
&args.call_info.name_tag,
));
}
@ -213,10 +213,9 @@ impl Shell for ValueShell {
fn pwd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {
let mut stream = VecDeque::new();
stream.push_back(ReturnSuccess::value(Tagged::from_item(
Value::string(self.path()),
args.call_info.name_tag,
)));
stream.push_back(ReturnSuccess::value(
Value::string(self.path()).tagged(&args.call_info.name_tag),
));
Ok(stream.into())
}