Skip to content

Instantly share code, notes, and snippets.

@andreivasiliu
Last active March 10, 2019 17:37
Show Gist options
  • Save andreivasiliu/48c24633dfca466d16f78e0b430a9bf7 to your computer and use it in GitHub Desktop.
Save andreivasiliu/48c24633dfca466d16f78e0b430a9bf7 to your computer and use it in GitHub Desktop.
use std::process::{Command, Stdio, ExitStatus};
use std::io::BufReader;
use tokio_process::CommandExt;
use futures::{Future, Stream, Sink};
use futures::sync::mpsc::{Sender, SendError, channel};
#[derive(Debug)]
enum ProcessEvent {
ProcessStarted,
Stdout(String),
Stderr(String),
ProcessEnded(ExitStatus),
ReadError(std::io::Error),
WaitError(std::io::Error),
}
fn spawn_process(
cmd_line: String, output: Sender<(u32, ProcessEvent)>
) -> impl Future<Item=(), Error=SendError<(u32, ProcessEvent)>> {
let mut child = Command::new("cmd").arg("/C").arg(cmd_line)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn_async()
.expect("cmd doesn't exist!");
let pid = child.id();
let stdout = child.stdout().take().unwrap();
let stderr = child.stderr().take().unwrap();
let stdout = tokio::io::lines(BufReader::new(stdout));
let stderr = tokio::io::lines(BufReader::new(stderr));
let stdout = stdout
.map(ProcessEvent::Stdout)
.or_else(|error| Ok(ProcessEvent::ReadError(error)))
.fold(output.clone(), move |sender, event| {
sender.send((pid, event))
})
.map(|_sender| ());
let stderr = stderr
.map(ProcessEvent::Stderr)
.or_else(|error| Ok(ProcessEvent::ReadError(error)))
.fold(output.clone(), move |sender, event| {
sender.send((pid, event))
})
.map(|_sender| ());
let another_output = output.clone();
let process_started = output
.send((pid, ProcessEvent::ProcessStarted))
.map(|_sender| ());
let process_ended = child
.map(ProcessEvent::ProcessEnded)
.or_else(|error| Ok(ProcessEvent::WaitError(error)))
.and_then(move |event| {
another_output.send((pid, event))
.map(|_sender| ())
});
let stdout_stderr = stdout.join(stderr)
.map(|((), ())| ());
process_started
.and_then(|()| stdout_stderr)
.and_then(|()| process_ended)
}
fn main() {
let input_lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
let (sender, receiver) = channel(0);
let multiplexer = input_lines
.map_err(|err| Err(err).expect("Couldn't read from stdin!"))
.for_each(move |line| {
let handler = spawn_process(line, sender.clone())
.map_err(|_err| panic!("Event printer closed before finishing!"));
tokio::spawn(handler)
});
let event_printer = receiver
.for_each(|(pid, event)| {
println!("Pid: {}, Event: {:?}", pid, event);
Ok(())
});
let both = multiplexer.join(event_printer)
.map(|((), ())| ());
println!("Now starting.");
tokio::run(both);
println!("Done!");
}

Ran: (echo ping www.google.com & echo ping www.google.com) | cargo run

And got:

Now starting.
Pid: 113588, Event: ProcessStarted
Pid: 116580, Event: ProcessStarted
Pid: 113588, Event: Stdout("")
Pid: 116580, Event: Stdout("")
Pid: 113588, Event: Stdout("Pinging www.google.com [2a00:1450:400d:805::2004] with 32 bytes of data:")
Pid: 116580, Event: Stdout("Pinging www.google.com [2a00:1450:400d:805::2004] with 32 bytes of data:")
Pid: 116580, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=23ms ")
Pid: 113588, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=23ms ")
Pid: 116580, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=23ms ")
Pid: 113588, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=22ms ")
Pid: 116580, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=23ms ")
Pid: 113588, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=22ms ")
Pid: 116580, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=27ms ")
Pid: 113588, Event: Stdout("Reply from 2a00:1450:400d:805::2004: time=26ms ")
Pid: 116580, Event: Stdout("")
Pid: 113588, Event: Stdout("")
Pid: 116580, Event: Stdout("Ping statistics for 2a00:1450:400d:805::2004:")
Pid: 113588, Event: Stdout("Ping statistics for 2a00:1450:400d:805::2004:")
Pid: 116580, Event: Stdout("    Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),")
Pid: 113588, Event: Stdout("    Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),")
Pid: 116580, Event: Stdout("Approximate round trip times in milli-seconds:")
Pid: 113588, Event: Stdout("Approximate round trip times in milli-seconds:")
Pid: 116580, Event: Stdout("    Minimum = 23ms, Maximum = 27ms, Average = 24ms")
Pid: 113588, Event: Stdout("    Minimum = 22ms, Maximum = 26ms, Average = 23ms")
Pid: 116580, Event: ProcessEnded(ExitStatus(ExitStatus(0)))
Pid: 113588, Event: ProcessEnded(ExitStatus(ExitStatus(0)))
Done!
[package]
name = "rust-futuretrack"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-process = "0.2.3"
futures = "0.1.25"
tokio = "0.1.16"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment