-
-
Save stonerl/5003d68522f2d9a53534c3a2fea805bd to your computer and use it in GitHub Desktop.
Concurrent, forked, cancellable tasks in Shiny
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(shiny) | |
# Also uses parallel, shinyjs, tools | |
# Create a long-running task, executed in a forked process. (Doesn't work on Windows) | |
# | |
# The return value is a promise-like object with three | |
# methods: | |
# - completed(): FALSE initially, then TRUE if the task succeeds, | |
# fails, or is cancelled. Reactive, so when the state changes | |
# any reactive readers will invalidate. | |
# - result(): Use this to get the return value. While execution is | |
# in progress, performs a req(FALSE). If task succeeded, returns | |
# the return value. If failed, throws error. Reactive, so when | |
# the state changes any reactive readers will invalidate. | |
# - cancel(): Call this to prematurely terminate the task. | |
create_forked_task <- function(expr) { | |
makeReactiveBinding("state") | |
state <- factor("running", | |
levels = c("running", "success", "error", "cancel"), | |
ordered = TRUE | |
) | |
result <- NULL | |
# Launch the task in a forked process. This always returns | |
# immediately, and we get back a handle we can use to monitor | |
# or kill the job. | |
task_handle <- parallel::mcparallel({ | |
force(expr) | |
}) | |
# Poll every 100 milliseconds until the job completes | |
o <- observe({ | |
res <- parallel::mccollect(task_handle, wait = FALSE) | |
if (is.null(res)) { | |
invalidateLater(100) | |
} else { | |
o$destroy() | |
if (!is.list(res) || length(res) != 1 || !inherits(res[[1]], "try-error")) { | |
state <<- "success" | |
result <<- res[[1]] | |
} else { | |
state <<- "error" | |
result <<- attr(res[[1]], "condition", exact = TRUE) | |
} | |
} | |
}) | |
list( | |
completed = function() { | |
state != "running" | |
}, | |
result = function() { | |
if (state == "running") { | |
# If running, abort the current context silently. | |
# We've taken a reactive dependency on "state" so if | |
# the state changes the context will invalidate. | |
req(FALSE) | |
} else if (state == "success") { | |
return(result) | |
} else if (state == "error") { | |
stop(result) | |
} else if (state == "cancel") { | |
validate(need(FALSE, "The operation was cancelled")) | |
} | |
}, | |
cancel = function() { | |
if (state == "running") { | |
state <<- "cancel" | |
o$destroy() | |
tools::pskill(task_handle$pid, tools::SIGTERM) | |
tools::pskill(-task_handle$pid, tools::SIGTERM) | |
parallel::mccollect(task_handle, wait = FALSE) | |
} | |
} | |
) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(shiny) | |
source("create_forked_task.R") | |
ui <- fluidPage( | |
shinyjs::useShinyjs(), # Initialize shinyjs library | |
# Buttons to control job | |
actionButton("start", "Start"), | |
shinyjs::disabled(actionButton("stop", "Stop")), | |
# This will display the job output | |
tableOutput("out") | |
) | |
server <- function(input, output, session) { | |
# Make "task" behave like a reactive value | |
makeReactiveBinding("task") | |
task <- NULL | |
output$out <- renderTable({ | |
# The task starts out NULL but is required. The req() takes | |
# care of ensuring that we only proceed if it's non-NULL. | |
req(task)$result() | |
}) | |
observeEvent(input$start, { | |
shinyjs::enable("stop") | |
shinyjs::disable("start") | |
task <<- create_forked_task({ | |
# Pretend this takes a long time | |
Sys.sleep(5) | |
cars[sample(nrow(cars), 10),] | |
}) | |
# Show progress message during task start | |
prog <- Progress$new(session) | |
prog$set(message = "Executing task, please wait...") | |
o <- observe({ | |
# Only proceed when the task is completed (this could mean success, | |
# failure, or cancellation) | |
req(task$completed()) | |
# This observer only runs once | |
o$destroy() | |
# Close the progress indicator and update button state | |
prog$close() | |
shinyjs::disable("stop") | |
shinyjs::enable("start") | |
}) | |
}) | |
observeEvent(input$stop, { | |
task$cancel() | |
}) | |
} | |
shinyApp(ui, server) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment