diff --git a/Cargo.lock b/Cargo.lock index d9da44decb..f0618da4c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -972,6 +972,7 @@ version = "0.1.1" dependencies = [ "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "byte-unit 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "chrono-humanize 0.0.11 (registry+https://github.com/rust-lang/crates.io-index)", "chrono-tz 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 6011395ced..60e1f37644 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ futures-sink-preview = "0.3.0-alpha.16" tokio-fs = "0.1.6" futures_codec = "0.2.2" term = "0.5.2" +bytes = "0.4.12" [dependencies.pancurses] version = "0.16" diff --git a/src/commands/classified.rs b/src/commands/classified.rs index 5d0b48fc2e..be47cb5620 100644 --- a/src/commands/classified.rs +++ b/src/commands/classified.rs @@ -1,7 +1,46 @@ use crate::prelude::*; -use futures_codec::{Framed, LinesCodec}; +use futures::TryStreamExt; +use futures_codec::{Encoder, Decoder, Framed}; use std::sync::Arc; use subprocess::Exec; +use std::io::{Error, ErrorKind}; +use bytes::{BufMut, BytesMut}; + +/// A simple `Codec` implementation that splits up data into lines. +pub struct LinesCodec {} + +impl Encoder for LinesCodec { + type Item = String; + type Error = Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.put(item); + Ok(()) + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match src.iter().position(|b| b == &b'\n') { + Some(pos) if !src.is_empty() => { + let buf = src.split_to(pos + 1); + String::from_utf8(buf.to_vec()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) + } + _ if !src.is_empty() => { + let drained = src.take(); + String::from_utf8(drained.to_vec()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) + } + _ => Ok(None) + } + } +} crate struct ClassifiedInputStream { crate objects: InputStream,