Last active
March 18, 2019 15:29
-
-
Save gaborcsardi/a3e4eb9fc554353fc43307598f793a0c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
title: "Overview of Tools for External (R) Processes" | |
output: | |
html_document: | |
df_print: paged | |
--- | |
## processx - run external processes | |
```{r} | |
library(processx) | |
``` | |
### `processx::run()` | |
```{r} | |
px <- processx:::get_tool("px") | |
px | |
``` | |
Uses `$PATH`. Standard output and error are collected by default. | |
```{r} | |
run("/bin/ls") | |
run("ls") | |
``` | |
```{r} | |
pxhelp <- run(px, "--help") | |
cat(pxhelp$stderr) | |
``` | |
No quoting | |
```{r} | |
run(px, c("outln", "arg - with spaces", "outln", "'arg with quote'")) | |
``` | |
Interruption. | |
```{r, eval = FALSE} | |
run("sleep", "5") | |
``` | |
Spinner | |
```{r, eval = FALSE} | |
run(px, c("sleep", "5"), spinner = TRUE) | |
``` | |
Timeout | |
```{r, error = TRUE} | |
run(px, c("sleep", "5"), timeout = 1) | |
``` | |
Error if exit value is non-zero: | |
```{r, error = TRUE} | |
run(px, c("return", "10")) | |
``` | |
Echo standard output and error | |
```{r} | |
outp <- run("ls", "..", echo = TRUE) | |
``` | |
Setting environment variables, no quoting is needed. | |
```{r} | |
run(px, c("getenv", "FOO"), env = c(Sys.getenv(), FOO = "bar")) | |
``` | |
Redirect standard error to standard output | |
```{r} | |
run(px, c("out", "one\n", "err", "two\n", "out", "three\n\n"), | |
stderr_to_stdout = TRUE, echo = TRUE) | |
``` | |
Standard output and error callbacks | |
```{r} | |
cb <- function(line, proc) { | |
cat("Got:", line, "\n") | |
if (line == "done") proc$kill(close_connections = FALSE) | |
} | |
result <- run(px, | |
c("outln", "this", "outln", "that", "outln", "done", | |
"outln", "still here", "sleep", "10", "outln", "dead by now"), | |
stdout_line_callback = cb, | |
error_on_status = FALSE) | |
``` | |
### `processx::process` | |
Start a process in the background, manipulate it later. | |
```{r} | |
names(processx::process$public_methods) | |
``` | |
```{r} | |
proc <- process$new(px, c("sleep", "1000")) | |
proc | |
``` | |
```{r} | |
proc$is_alive() | |
``` | |
```{r} | |
proc$get_name() | |
``` | |
```{r} | |
proc$get_exe() | |
``` | |
```{r} | |
proc$get_cmdline() | |
``` | |
```{r} | |
proc$get_status() | |
``` | |
```{r} | |
proc$get_username() | |
``` | |
```{r} | |
proc$get_wd() | |
``` | |
```{r} | |
proc$get_cpu_times() | |
``` | |
```{r} | |
proc$get_memory_info() | |
``` | |
Manipulate a process | |
```{r} | |
proc$suspend() | |
proc$get_status() | |
proc$resume() | |
proc$get_status() | |
``` | |
```{r} | |
proc$interrupt() | |
proc | |
``` | |
```{r} | |
proc <- process$new(px, c("sleep", "1000")) | |
proc | |
``` | |
```{r} | |
proc$kill() | |
``` | |
Automatic cleanup | |
```{r} | |
proc <- process$new(px, c("sleep", "1000")) | |
proc | |
``` | |
```{r} | |
library(dplyr) | |
ps::ps() %>% | |
filter(name == "px") | |
``` | |
```{r} | |
rm(proc) | |
gc() | |
``` | |
```{r} | |
ps::ps() %>% | |
filter(name == "px") | |
``` | |
Standard output and error | |
```{r} | |
proc <- process$new("curl", "https://httpbin.org/delay/1", stdout = "|") | |
proc$read_output() | |
proc$poll_io(3000) | |
proc$read_output_lines() | |
``` | |
```{r} | |
proc$poll_io(-1) | |
``` | |
```{r} | |
proc$is_alive() | |
proc$get_exit_status() | |
``` | |
```{r} | |
p1 <- process$new("curl", "https://httpbin.org/delay/2", stdout = "|") | |
p2 <- process$new("curl", "https://httpbin.org/delay/1", stdout = "|") | |
poll(list(p1, p2), -1) | |
p2$read_output_lines() | |
poll(list(p1, p2), -1) | |
``` | |
```{r} | |
poll(list(p1, p2), -1) | |
p1$read_all_output_lines() | |
``` | |
### Back pressure | |
If the parent process does not read out the standard output/error pipes, | |
the pipe buffer will fill up, and the background process stops. _This is | |
a good thing._ It makes sure that the background process only produces | |
data at a rate that the parent process can handle. Once the parent reads | |
out the pipe buffer, the background process resumes. | |
## callr - run external R processes | |
### `callr::r()` | |
```{r} | |
library(callr) | |
Sys.getpid() | |
r(function() Sys.getpid()) | |
``` | |
Lexical scope is lost: | |
```{r, error = TRUE} | |
a <- 100 | |
r(function() a + 100) | |
``` | |
Always create an anonymous function, in case the function you want to call has a non-trivial scope. | |
```{r, error = TRUE} | |
r(.libPaths) | |
``` | |
```{r} | |
r(function() .libPaths()) | |
``` | |
Errors are copied over from the subprocess: | |
```{r, error = TRUE} | |
r(function() 1 + "A") | |
``` | |
They can include the stack. | |
```{r} | |
tryCatch( | |
r(function() { f <- function() g(); g <- function() 1 + "A"; f() }, | |
error = "stack"), | |
error = function(e) print(e$stack) | |
) | |
``` | |
### `callr::rcmd()` | |
For running `R CMD <command>` with standard output and error streaming. | |
### `callr::rscript() | |
For running `Rscript with standard output and error streaming. | |
### `callr::r_bg()` | |
Like `callr::r()` but run the R subprocess in the background. Returns | |
`callr::r_process` which inherits from `processx::process`. | |
```{r} | |
rp <- r_bg(function() { Sys.sleep(1); 1 + 1 } ) | |
rp$wait() | |
rp$get_result() | |
``` | |
### `callr::r_process` | |
You can also use the `r_process` R6 class directly, e.g. to inherit | |
from it. | |
### `callr::r_session` | |
This is a persistent R process, running in the background. You can send | |
computation to it. This is what pak uses to perform all of its operations. | |
```{r} | |
rs <- r_session$new() | |
rs | |
``` | |
```{r} | |
class(rs) | |
``` | |
Operations | |
```{r} | |
names(r_session$public_methods) | |
``` | |
`run()` is synchronous. | |
```{r} | |
rs$run(function() Sys.getpid()) | |
``` | |
`call()` is asynchronous. | |
```{r} | |
rs$call(function() Sys.getpid()) | |
rs | |
``` | |
```{r} | |
rs$poll_io(-1) | |
``` | |
```{r} | |
rs$read() | |
``` | |
```{r} | |
rs$call(function() { | |
print(1:200) | |
message("done") | |
"exited" | |
}) | |
rs$poll_io(-1) | |
result <- rs$read() | |
``` | |
```{r} | |
result | |
``` | |
Attaching to the terminal of the backgound process. This will be | |
used for "live" debugging in the future. | |
```{r} | |
x <- rs$run(function() library(purrr)) | |
x <- rs$run(function() .GlobalEnv$x <- 100) | |
``` | |
```{r, eval = FALSE} | |
rs$attach() | |
# Interrupt to detach | |
``` | |
```{r} | |
rs$close() | |
rs | |
``` | |
## ps - query and manipulate processes | |
Query and manipulate system processes | |
```{r} | |
library(ps) | |
ls("package:ps") | |
``` | |
```{r} | |
ps_is_supported() | |
``` | |
```{r} | |
me <- ps_handle() | |
me | |
``` | |
```{r} | |
ps_exe(me) | |
``` | |
List all processes | |
```{r} | |
ps::ps() | |
``` | |
Query process information: pid, exe, cmdline, parent process, child | |
proceses, creation time, etc. | |
Open files: | |
```{r} | |
ps_open_files(me) | |
``` | |
Network connections | |
```{r} | |
ps_connections(me) | |
``` | |
Environment variables: | |
```{r} | |
ps_environ(me)[c("TERM", "USER", "SHELL", "R_HOME")] | |
``` | |
Process manipulation: suspend, resume, send signal (UNIX), interrupt, kill. | |
## Process (tree) Cleanup | |
Mark a process tree by setting a unique environment variable. When | |
starting a process: | |
1. Set unique (random) env var in `processx::process`. | |
2. This variable is inherited in all of its child, grandchild, etc. | |
processes. | |
To clean up a process tree: | |
1. Enumerate all system processes. | |
2. Check their environment for the unique env var. | |
3. If it is set, then eliminate them. | |
```{r, eval = FALSE} | |
processx::run(..., cleanup_tree = TRUE) | |
processx::process$new(..., cleanup_tree = TRUE) | |
callr::r(..., cleanup_tree = TRUE) | |
etc. | |
``` | |
`cleanup_tree` defaults to `FALSE` currently, to be changed later. | |
## Cleanup reporter for testthat | |
For every `test_that()` block check that newly started processes were | |
cleaned up, and all open files and connections were closed. | |
In `testthat.r`: | |
```{r, eval = FALSE} | |
library(testthat) | |
library(mypackage) | |
if (ps::ps_is_supported()) { | |
reporter <- ps::CleanupReporter(testthat::ProgressReporter)$new() | |
} else { | |
## ps does not support this platform | |
reporter <- "progress" | |
} | |
test_check("mypackage", reporter = reporter) | |
``` | |
By default it generates a test failure, and closes the resource. It can | |
also just close silently, if that's preferred. | |
## Messages, Progress Bars, etc. | |
We want to be able to send back messages from the subprocess, to create | |
status bars, progress bars, etc. Right now this is implemented for | |
`callr::r_process`. Message conditions of class `callr_message` are | |
caught in the subprocess and transmitted through another connection to the | |
parent process. | |
```{r, error = TRUE} | |
do <- function() { | |
msg <- structure(list(message = "hi"), | |
class = c("callr_message", "condition")) | |
signalCondition(msg) | |
signalCondition(msg) | |
stop("nah-ah") | |
} | |
rs <- r_session$new() | |
withCallingHandlers( | |
rs$run(do), | |
callr_message = function(m) print(m)) | |
``` | |
The clipapp package uses this mechanism, to seamlessly transmit cli | |
messages to the parent. | |
```{r} | |
rs <- r_session$new() | |
cliapp::start_app(theme = cliapp::simple_theme()) | |
cliapp::cli_div(theme = list(par = list("margin-bottom" = 1))) | |
res <- rs$run(function() { | |
cliapp::cli_h1("Title") | |
cliapp::cli_h2("Subtitle") | |
cliapp::cli_par() | |
cliapp::cli_text(cliapp:::lorem_ipsum()) | |
cliapp::cli_end() | |
cliapp::cli_alert_danger("Watch out!") | |
cliapp::cli_alert_success("Done.") | |
invisible() | |
}) | |
``` | |
Note that the theming happens in the parent process! | |
Global message handlers: | |
```{r} | |
do <- function() { | |
cliapp::cli_div() | |
cliapp::cli_h1("title") | |
cliapp::cli_text("text") | |
} | |
rs <- callr::r_session$new() | |
msgs <- list() | |
withr::with_options(list( | |
cli.default_handler = function(msg) { | |
msgs <<- c(msgs, list(msg)) | |
if (!is.null(findRestart("muffleMessage"))) { | |
invokeRestart("muffleMessage") | |
} | |
}), | |
res <- rs$run(do) | |
) | |
msgs | |
``` | |
## async - integrated framework for async computation and IO | |
```{r} | |
library(async) | |
afun <- async(function() { | |
when_all( | |
delay = delay(1/1000)$ | |
then(function() 1), | |
http = http_get("https://eu.httpbin.org/status/418")$ | |
then(function(x) x$status_code), | |
process = run_process("pwd")$ | |
then(function(x) trimws(x$stdout)), | |
r_process = run_r_process(function() 2)$ | |
then(function(x) x$result), | |
call = call_function(function() 3)$ | |
then(function(x) x$result) | |
) | |
}) | |
synchronise(afun()) | |
``` | |
## Summary | |
External subprocesses: processx. | |
External R subprocesses: callr. | |
System processes (i.e. not only subprocesses): ps. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment