Skip to content

Instantly share code, notes, and snippets.

@ArtemGr
Last active December 22, 2023 07:08
Show Gist options
  • Save ArtemGr/db40ae04b431a95f2b78 to your computer and use it in GitHub Desktop.
Save ArtemGr/db40ae04b431a95f2b78 to your computer and use it in GitHub Desktop.
Read lines from a pipe as soon as they come out (useful for filtering).
#![feature(mpsc_select, box_syntax)]
use std::io;
use std::process::Command;
use std::sync::mpsc::{channel, Receiver, Select};
use std::string::FromUtf8Error;
use std::thread::spawn;
#[derive(Debug)]
enum PipeError {
IO(io::Error),
NotUtf8(FromUtf8Error),
}
#[derive(Debug)]
enum PipedLine {
Line(String),
EOF,
}
// Reads data from the pipe byte-by-byte and returns the lines.
// Useful for processing the pipe's output as soon as it becomes available.
struct PipeStreamReader {
lines: Receiver<Result<PipedLine, PipeError>>,
}
impl PipeStreamReader {
// Starts a background task reading bytes from the pipe.
fn new(mut stream: Box<io::Read + Send>) -> PipeStreamReader {
PipeStreamReader {
lines: {
let (tx, rx) = channel();
spawn(move || {
let mut buf = Vec::new();
let mut byte = [0u8];
loop {
match stream.read(&mut byte) {
Ok(0) => {
let _ = tx.send(Ok(PipedLine::EOF));
break;
}
Ok(_) => {
if byte[0] == 0x0A {
tx.send(match String::from_utf8(buf.clone()) {
Ok(line) => Ok(PipedLine::Line(line)),
Err(err) => Err(PipeError::NotUtf8(err)),
})
.unwrap();
buf.clear()
} else {
buf.push(byte[0])
}
}
Err(error) => {
tx.send(Err(PipeError::IO(error))).unwrap();
}
}
}
});
rx
},
}
}
}
fn main() {
let mut command = Command::new("sh");
command.arg("-c").arg("echo 1; sleep 1; echo 2; sleep 1; echo 3");
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
let mut process = command.spawn().expect("!spawn");
let out = PipeStreamReader::new(box process.stdout.take().expect("!stdout"));
let err = PipeStreamReader::new(box process.stderr.take().expect("!stderr"));
let select = Select::new();
let mut out_rx = select.handle(&out.lines);
let mut err_rx = select.handle(&err.lines);
unsafe {
out_rx.add();
err_rx.add();
}
let mut out_eof = false;
let mut err_eof = false;
while !out_eof || !err_eof {
let evid = select.wait();
let recv_result = if out_rx.id() == evid {
out_rx.recv()
} else {
err_rx.recv()
};
match recv_result {
Ok(remote_result) => {
match remote_result {
Ok(piped_line) => {
match piped_line {
PipedLine::Line(line) => println!("{}", line),
PipedLine::EOF => {
if out_rx.id() == evid {
out_eof = true;
unsafe { out_rx.remove() }
} else {
err_eof = true;
unsafe { err_rx.remove() }
}
}
}
}
Err(error) => println!("system] error: {:?}", error),
}
}
Err(_) => {
if out_rx.id() == evid {
out_eof = true;
unsafe { out_rx.remove() }
} else {
err_eof = true;
unsafe { err_rx.remove() }
}
}
}
}
let status = process.wait().expect("!wait");
if !status.success() {
panic!("!status: {}", status.code().expect("!code"))
}
}
@DayBr3ak
Copy link

I couldn't quite get this to run since select got dropped, but you can look into https://docs.rs/crossbeam/latest/crossbeam/channel/index.html#selection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment