Last active
December 22, 2023 07:08
-
-
Save ArtemGr/db40ae04b431a95f2b78 to your computer and use it in GitHub Desktop.
Read lines from a pipe as soon as they come out (useful for filtering).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(mpsc_select, box_syntax)] | |
use std::io; | |
use std::process::Command; | |
use std::sync::mpsc::{channel, Receiver, Select}; | |
use std::string::FromUtf8Error; | |
use std::thread::spawn; | |
#[derive(Debug)] | |
enum PipeError { | |
IO(io::Error), | |
NotUtf8(FromUtf8Error), | |
} | |
#[derive(Debug)] | |
enum PipedLine { | |
Line(String), | |
EOF, | |
} | |
// Reads data from the pipe byte-by-byte and returns the lines. | |
// Useful for processing the pipe's output as soon as it becomes available. | |
struct PipeStreamReader { | |
lines: Receiver<Result<PipedLine, PipeError>>, | |
} | |
impl PipeStreamReader { | |
// Starts a background task reading bytes from the pipe. | |
fn new(mut stream: Box<io::Read + Send>) -> PipeStreamReader { | |
PipeStreamReader { | |
lines: { | |
let (tx, rx) = channel(); | |
spawn(move || { | |
let mut buf = Vec::new(); | |
let mut byte = [0u8]; | |
loop { | |
match stream.read(&mut byte) { | |
Ok(0) => { | |
let _ = tx.send(Ok(PipedLine::EOF)); | |
break; | |
} | |
Ok(_) => { | |
if byte[0] == 0x0A { | |
tx.send(match String::from_utf8(buf.clone()) { | |
Ok(line) => Ok(PipedLine::Line(line)), | |
Err(err) => Err(PipeError::NotUtf8(err)), | |
}) | |
.unwrap(); | |
buf.clear() | |
} else { | |
buf.push(byte[0]) | |
} | |
} | |
Err(error) => { | |
tx.send(Err(PipeError::IO(error))).unwrap(); | |
} | |
} | |
} | |
}); | |
rx | |
}, | |
} | |
} | |
} | |
fn main() { | |
let mut command = Command::new("sh"); | |
command.arg("-c").arg("echo 1; sleep 1; echo 2; sleep 1; echo 3"); | |
command.stdout(std::process::Stdio::piped()); | |
command.stderr(std::process::Stdio::piped()); | |
let mut process = command.spawn().expect("!spawn"); | |
let out = PipeStreamReader::new(box process.stdout.take().expect("!stdout")); | |
let err = PipeStreamReader::new(box process.stderr.take().expect("!stderr")); | |
let select = Select::new(); | |
let mut out_rx = select.handle(&out.lines); | |
let mut err_rx = select.handle(&err.lines); | |
unsafe { | |
out_rx.add(); | |
err_rx.add(); | |
} | |
let mut out_eof = false; | |
let mut err_eof = false; | |
while !out_eof || !err_eof { | |
let evid = select.wait(); | |
let recv_result = if out_rx.id() == evid { | |
out_rx.recv() | |
} else { | |
err_rx.recv() | |
}; | |
match recv_result { | |
Ok(remote_result) => { | |
match remote_result { | |
Ok(piped_line) => { | |
match piped_line { | |
PipedLine::Line(line) => println!("{}", line), | |
PipedLine::EOF => { | |
if out_rx.id() == evid { | |
out_eof = true; | |
unsafe { out_rx.remove() } | |
} else { | |
err_eof = true; | |
unsafe { err_rx.remove() } | |
} | |
} | |
} | |
} | |
Err(error) => println!("system] error: {:?}", error), | |
} | |
} | |
Err(_) => { | |
if out_rx.id() == evid { | |
out_eof = true; | |
unsafe { out_rx.remove() } | |
} else { | |
err_eof = true; | |
unsafe { err_rx.remove() } | |
} | |
} | |
} | |
} | |
let status = process.wait().expect("!wait"); | |
if !status.success() { | |
panic!("!status: {}", status.code().expect("!code")) | |
} | |
} |
@oli-obk, I don't see where you simplified it, looks like just a restyle. Some parts are more compressed in your version while other parts are more verbose.
Regarding your question "// why not simply break out of the loop instead of setting boolean flags?" - we're not breaking because we must wait for both streams to signal EOF. One stream finishing doesn't mean there's no information left in the other one.
Anyway, I've updated the code with a version that passes EOF explicitly, feels safer that way.
I couldn't quite get this to run since select got dropped, but you can look into https://docs.rs/crossbeam/latest/crossbeam/channel/index.html#selection
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I took the liberty of simplifying some parts of your code. I tried to stay with your style as much as possible. Hope it's useful to you