#' External R Session
#'
#' @description
#' A permanent R session that runs in the background. This is an R6 class
#' that extends the [processx::process] class.
#'
#' The process is started at the creation of the object, and then it can
#' be used to evaluate R function calls, one at a time.
#'
#' @param func Function object to call in the background R process.
#' Please read the notes for the similar argument of [r()].
#' @param args Arguments to pass to the function. Must be a list.
#' @param package Whether to keep the environment of `func` when passing
#' it to the other package. Possible values are:
#' * `FALSE`: reset the environment to `.GlobalEnv`. This is the default.
#' * `TRUE`: keep the environment as is.
#' * `pkg`: set the environment to the `pkg` package namespace.
#'
#' @examplesIf FALSE
#' rs <- r_session$new()
#'
#' rs$run(function() 1 + 2)
#'
#' rs$call(function() Sys.sleep(1))
#' rs$get_state()
#'
#' rs$poll_process(-1)
#' rs$get_state()
#' rs$read()
#' @export
r_session <- R6::R6Class(
"r_session",
inherit = processx::process,
public = list(
#' @field status
#' Status codes returned by `read()`.
status = list(
DONE = 200L,
STARTED = 201L,
ATTACH_DONE = 202L,
MSG = 301L,
EXITED = 500L,
CRASHED = 501L,
CLOSED = 502L
),
#' @description
#' creates a new R background process. It can wait for the process to
#' start up (`wait = TRUE`), or return immediately, i.e. before
#' the process is actually ready to run. In the latter case you may call
#' the `poll_process()` method to make sure it is ready.
#'
#' @param options A list of options created via [r_session_options()].
#' @param wait Whether to wait for the R process to start and be ready
#' for running commands.
#' @param wait_timeout Timeout for waiting for the R process to start,
#' in milliseconds.
#' @return An `r_session` object.
initialize = function(options = r_session_options(), wait = TRUE,
wait_timeout = 3000)
rs_init(self, private, super, options, wait, wait_timeout),
#' @description
#' Similar to [r()], but runs the function in a permanent background
#' R session. It throws an error if the function call generated an
#' error in the child process.
#' @return The return value of the R expression.
run = function(func, args = list(), package = FALSE)
rs_run(self, private, func, args, package),
#' @description
#' Similar to `$run()`, but returns the standard output and error of
#' the child process as well. It does not throw on errors, but
#' returns a non-`NULL` `error` member in the result list.
#'
#' @return A list with the following entries.
#' * `result`: The value returned by `func`. On error this is `NULL`.
#' * `stdout`: The standard output of the process while evaluating
# the `func` call,
#' * `stderr`: The standard error of the process while evaluating
#' the `func` call.
#' * `error`: On error it contains an error object, that contains the
#' error thrown in the subprocess. Otherwise it is `NULL`.
#' * `code`, `message`: These fields are used by call internally and
#' you can ignore them.
run_with_output = function(func, args = list(), package = FALSE)
rs_run_with_output(self, private, func, args, package),
#' @description
#' Starts running a function in the background R session, and
#' returns immediately. To check if the function is done, call the
#' `poll_process()` method.
call = function(func, args = list(), package = FALSE)
rs_call(self, private, func, args, package),
#' @description
#' Poll the R session with a timeout. If the session has finished the
#' computation, it returns with `"ready"`. If the timeout
#' is reached, it returns with `"timeout"`.
#'
#' @param timeout Timeout period in milliseconds.
#' @return Character string `"ready"` or `"timeout"`.
poll_process = function(timeout)
rs_poll_process(self, private, timeout),
#' @description
#' Return the state of the R session.
#'
#' @return Possible values:
#' * `"starting"`: starting up,
#' * `"idle"`: ready to compute,
#' * `"busy"`: computing right now,
#' * `"finished"`: the R process has finished.
get_state = function()
rs_get_state(self, private),
#' @description
#' Returns the elapsed time since the R process has started, and the
#' elapsed time since the current computation has started. The latter
#' is `NA` if there is no active computation.
#' @return Named vector of `POSIXct` objects. The names are `"total"`
#' and `"current"`.
get_running_time = function()
rs_get_running_time(self, private),
#' @description
#' Reads an event from the child process, if there is one available.
#' Events might signal that the function call has finished, or they
#' can be progress report events.
#'
#' This is a low level function that you only need to use if you
#' want to process events (messages) from the R session manually.
#'
#' @return `NULL` if no events are available. Otherwise a named list,
#' which is also a `callr_session_result` object. The list always has
#' a `code` entry which is the type of the event. See also
#' `r_session$public_fields$status` for symbolic names of the
#' event types.
#' * `200`: (`DONE`) The computation is done, and the event includes
#' the result, in the same form as for the `run()` method.
#' * `201`: (`STARTED`) An R session that was in 'starting' state is
#' ready to go.
#' * `202`: (`ATTACH_DONE`) Used by the `attach()` method.
#' * `301`: (`MSG`) A message from the subprocess. The message is a
#' condition object with class `callr_message`. (It typically has
#' other classes, e.g. `cli_message` for output from the cli
#' package.)
#' * `500`: (`EXITED`) The R session finished cleanly. This means
#' that the evaluated expression quit R.
#' * `501`: (`CRASHED`) The R session crashed or was killed.
#' * `502`: (`CLOSED`) The R session closed its end of the connection
#' that callr uses for communication.
read = function()
rs_read(self, private),
#' @description
#' Terminate the current computation and the R process.
#' The session object will be in `"finished"` state after this.
#' @param grace Grace period in milliseconds, to wait for the
#' subprocess to exit cleanly, after its standard input is closed.
#' If the process is still running after this period, it will be
#' killed.
close = function(grace = 1000)
rs_close(self, private, grace),
#' @description
#' The `traceback()` method can be used after an error in the R
#' subprocess. It is equivalent to the [base::traceback()] call, in
#' the subprocess.
#'
#' On callr version 3.8.0 and above, you need to set the
#' `callr.traceback` option to `TRUE` (in the main process) to make
#' the subprocess save the trace on error. This is because saving
#' the trace can be costly for large objects passed as arguments.
#' @return The same output as from [base::traceback()]
traceback = function()
rs_traceback(self, private),
#' @description
#' Interactive debugger to inspect the dumped frames in the subprocess,
#' after an error. See more at [r_session_debug].
#'
#' On callr version 3.8.0 and above, you need to set the
#' `callr.traceback` option to `TRUE` (in the main process) to make
#' the subprocess dump frames on error. This is because saving
#' the frames can be costly for large objects passed as arguments.
debug = function()
rs_debug(self, private),
#' @description Experimental function that provides a REPL
#' (Read-Eval-Print-Loop) to the subprocess.
attach = function()
rs_attach(self, private),
#' @description
#' Finalizer that is called when garbage collecting an `r_session`
#' object, to clean up temporary files.
finalize = function() {
unlink(private$tmp_output_file)
unlink(private$tmp_error_file)
unlink(private$options$tmp_files, recursive = TRUE)
if ("finalize" %in% ls(super)) super$finalize()
},
#' @description
#' Print method for an `r_session`.
#' @param ... Arguments are not used currently.
print = function(...) {
cat(
sep = "",
"R SESSION, ",
if (self$is_alive()) {
paste0("alive, ", self$get_state(), ", ")
} else {
"finished, "
},
"pid ", self$get_pid(), ".\n")
invisible(self)
}
),
private = list(
options = NULL,
state = NULL,
started_at = NULL,
fun_started_at = as.POSIXct(NA),
pipe = NULL,
tmp_output_file = NULL,
tmp_error_file = NULL,
func_file = NULL,
res_file = NULL,
buffer = NULL,
read_buffer = function()
rs__read_buffer(self, private),
read_message = function()
rs__read_message(self, private),
get_result_and_output = function(std = FALSE)
rs__get_result_and_output(self, private, std),
report_back = function(code, text = "")
rs__report_back(self, private, code, text),
write_for_sure = function(text)
rs__write_for_sure(self, private, text),
parse_msg = function(msg)
rs__parse_msg(self, private, msg),
attach_wait = function()
rs__attach_wait(self, private)
)
)
rs_init <- function(self, private, super, options, wait, wait_timeout) {
options$func <- options$func %||% function() { }
options$args <- list()
options$load_hook <- session_load_hook(options$load_hook)
options <- convert_and_check_my_args(options)
options <- setup_context(options)
options <- setup_r_binary_and_args(options, script_file = FALSE)
private$options <- options
prepare_client_files()
with_envvar(
options$env,
do.call(super$initialize, c(list(options$bin, options$real_cmdargs,
stdin = "|", stdout = "|", stderr = "|", poll_connection = TRUE),
options$extra))
)
## Make child report back when ready
private$report_back(201, "ready to go")
private$pipe <- self$get_poll_connection()
private$started_at <- Sys.time()
private$state <- "starting"
if (wait) {
timeout <- wait_timeout
have_until <- Sys.time() + as.difftime(timeout / 1000, units = "secs")
pr <- self$poll_io(timeout)
out <- ""
err <- ""
while (any(pr == "ready")) {
if (pr["output"] == "ready") out <- paste0(out, self$read_output())
if (pr["error"] == "ready") err <- paste0(err, self$read_error())
if (pr["process"] == "ready") break
timeout <- as.double(have_until - Sys.time(), units = "secs") * 1000
pr <- self$poll_io(as.integer(timeout))
}
if (pr["process"] == "ready") {
msg <- self$read()
out <- paste0(out, msg$stdout)
err <- paste0(err, msg$stderr)
if (msg$code != 201) {
data <- list(
status = self$get_exit_status(),
stdout = out,
stderr = err,
timeout = FALSE
)
throw(new_callr_crash_error(data, "Failed to start R session"))
}
} else if (pr["process"] != "ready") {
cat("stdout:]\n", out, "\n")
cat("stderr:]\n", err, "\n")
throw(new_error("Could not start R session, timed out"))
}
}
invisible(self)
}
rs_read <- function(self, private) {
if (!is.null(private$buffer)) {
# There is a partial message in the buffer, try to finish it.
out <- private$read_buffer()
} else {
# A new message.
out <- private$read_message()
}
if (!length(out)) {
if (processx::processx_conn_is_incomplete(private$pipe)) return()
if (self$is_alive()) {
# We do this in on.exit(), because parse_msg still reads the streams
on.exit(self$kill(), add = TRUE)
out <- list(header = list(
code = 502, length = 0,
rest = "R session closed the process connection, killed"
))
} else if (identical(es <- self$get_exit_status(), 0L)) {
out <- list(header = list(
code = 500, length = 0,
rest = "R session finished cleanly"
))
} else {
out <- list(header = list(
code = 501, length = 0,
rest = paste0("R session crashed with exit code ", es)
))
}
}
if (length(out)) private$parse_msg(out)
}
rs__read_buffer <- function(self, private) {
# There is a partial message in the buffer already, we need to
# read some more
need <- private$buffer$header$length - private$buffer$got
chunk <- processx::processx_conn_read_chars(private$pipe, need)
got <- nchar(chunk)
if (got == 0) {
# make this special case fast
NULL
} else if (got == need) {
msg <- list(
header = private$buffer$header,
body = paste(c(private$buffer$chunks, list(chunk)), collapse = "")
)
private$buffer <- NULL
msg
} else {
private$buffer$got <- private$buffer$got + got
private$buffer$chunks <- c(private$buffer$chunks, list(chunk))
NULL
}
}
rs__read_message <- function(self, private) {
# A new message, we can surely read the first line
out <- processx::processx_conn_read_lines(private$pipe, 1)
if (length(out) == 0) return(NULL)
header <- rs__parse_header(out)
body <- ""
if (header$length > 0) {
body <- processx::processx_conn_read_chars(
private$pipe,
header$length
)
}
got <- nchar(body)
if (got < header$length) {
# Partial message
private$buffer <- list(
header = header,
got = got,
chunks = list(body)
)
NULL
} else {
list(header = header, body = body)
}
}
rs__parse_header <- function(line) {
parts <- strsplit(line, " ", fixed = TRUE)[[1]]
parts2 <- suppressWarnings(as.integer(parts[1:2]))
rest <- paste(parts[-(1:2)], collapse = " ")
header <- list(code = parts2[1], length = parts2[2], rest = rest)
if (is.na(header$code) || is.na(header$length)) {
stop("Internal callr error, invalid message header")
}
header
}
rs_close <- function(self, private, grace) {
processx::processx_conn_close(self$get_input_connection())
self$poll_process(grace)
self$kill()
self$wait(1000)
if (self$is_alive()) throw(new_error("Could not kill background R session"))
private$state <- "finished"
private$fun_started_at <- as.POSIXct(NA)
processx::processx_conn_close(private$pipe)
processx::processx_conn_close(self$get_output_connection())
processx::processx_conn_close(self$get_error_connection())
invisible()
}
rs_call <- function(self, private, func, args, package) {
## We only allow a new command if the R session is idle.
## This allows keeping a clean state
## TODO: do we need a state at all?
if (private$state == "starting") throw(new_error("R session not ready yet"))
if (private$state == "finished") throw(new_error("R session finished"))
if (private$state == "busy") throw(new_error("R session busy"))
## Save the function in a file
private$options$func <- func
private$options$args <- args
private$options$package <- package
private$options$func_file <- save_function_to_temp(private$options)
private$options$result_file <- tempfile("callr-rs-result-")
private$options$tmp_files <-
c(private$options$tmp_files, private$options$func_file,
private$options$result_file)
## Maybe we need to redirect stdout / stderr
re_stdout <- if (is.null(private$options$stdout)) {
private$tmp_output_file <- tempfile("callr-rs-stdout-")
}
re_stderr <- if (is.null(private$options$stderr)) {
private$tmp_error_file <- tempfile("callr-rs-stderr-")
}
pre <- rs__prehook(re_stdout, re_stderr)
post <- rs__posthook(re_stdout, re_stderr)
## Run an expr that loads it, in the child process, with error handlers
expr <- make_vanilla_script_expr(private$options$func_file,
private$options$result_file,
private$options$error,
pre_hook = pre, post_hook = post,
messages = TRUE,
print_error = FALSE)
cmd <- paste0(deparse(expr), "\n")
## Write this to stdin
private$write_for_sure(cmd)
private$fun_started_at <- Sys.time()
## Report back when done
report_str <- paste0("done ", basename(private$options$result_file))
private$report_back(200, report_str)
private$state <- "busy"
}
rs_run_with_output <- function(self, private, func, args, package) {
self$call(func, args, package)
go <- TRUE
res <- NULL
while (go) {
## TODO: why is this in a tryCatch?
res <- tryCatch(
{ processx::poll(list(private$pipe), -1)
msg <- self$read()
if (is.null(msg)) next
if (msg$code == 200 || (msg$code >= 500 && msg$code < 600)) {
return(msg)
}
if (msg$code == 301) {
rs__handle_condition(msg$message)
}
},
interrupt = function(e) {
self$interrupt()
## The R process will catch the interrupt, and then save the
## error object to a file, but this might still take some time,
## so we need to poll here. If the bg process ignores
## interrupts, then we kill it.
ps <- processx::poll(list(private$pipe), 1000)[[1]]
if (ps == "timeout") {
self$kill()
} else {
res <<- self$read()
go <<- FALSE
}
iconn <- structure(
list(message = "Interrupted"),
class = c("interrupt", "condition"))
signalCondition(iconn)
cat("\n")
invokeRestart("abort")
})
}
res
}
rs_run <- function(self, private, func, args, package) {
res <- rs_run_with_output(self, private, func, args, package)
if (is.null(res$error)) {
res$result
} else{
res$stdout <- paste0(res$stdout, self$read_output())
res$stderr <- paste0(res$stderr, self$read_error())
throw(res$error)
}
}
rs_get_state <- function(self, private) {
private$state
}
rs_get_running_time <- function(self, private) {
now <- Sys.time()
finished <- private$state == "finished"
idle <- private$state == "idle"
missing <- as.difftime(NA_real_, units = "secs")
c(total = now - private$started_at,
current = if (finished | idle) missing else now - private$fun_started_at)
}
rs_poll_process <- function(self, private, timeout) {
processx::poll(list(self$get_poll_connection()), timeout)[[1]]
}
rs_traceback <- function(self, private) {
tb <- self$run(function() {
traceback(as.environment("tools:callr")$`__callr_data__`$.Traceback, 10)
})
if (is.null(tb)) {
throw(new_error("No traceback was recorded in the subprocess (yet?)"))
} else {
traceback(utils::head(tb, -12))
}
}
rs_debug <- function(self, private) {
hasdump <- self$run(function() {
! is.null(as.environment("tools:callr")$`__callr_data__`$.Last.dump)
})
if (!hasdump) stop("Can't find dumped frames, nothing to debug")
help <- function() {
cat("Debugging in process ", self$get_pid(),
", press CTRL+C (ESC) or type .q to quit. Commands:\n", sep = "")
cat(" .where -- print stack trace\n",
" .inspect <n> -- inspect a frame, 0 resets to .GlobalEnv\n",
" .help -- print this message\n",
" .q -- quit debugger\n",
" <cmd> -- run <cmd> in frame or .GlobalEnv\n\n", sep = "")
}
should_quit <- FALSE
translate_cmd <- function(cmd) {
if (cmd == ".where") {
traceback(tb)
if (frame) cat("Inspecting frame", frame, "\n")
NULL
} else if (cmd == ".help") {
help()
NULL
} else if (grepl("^.inspect ", cmd)) {
newframe <- as.integer(strsplit(cmd, " ")[[1]][[2]])
if (is.na(newframe)) {
message("Cannot parse frame number")
} else {
frame <<- newframe
}
NULL
} else if (cmd == ".q") {
should_quit <<- TRUE
NULL
} else {
cmd
}
}
help()
tb <- self$traceback()
frame <- 0L
while (TRUE) {
cat("\n")
prompt <- paste0(
"RS ", self$get_pid(),
if (frame) paste0(" (frame ", frame, ")"), " > ")
cmd <- rs__attach_get_input(prompt)
cmd2 <- translate_cmd(cmd)
if (should_quit) break
if (is.null(cmd2)) next
try(update_history(cmd), silent = TRUE)
ret <- self$run_with_output(function(cmd, frame) {
dump <- as.environment("tools:callr")$`__callr_data__`$.Last.dump
envir <- if (!frame) .GlobalEnv else dump[[frame + 12L]]
eval(parse(text = cmd), envir = envir)
}, list(cmd = cmd, frame = frame))
cat(ret$stdout)
cat(ret$stderr)
if (!is.null(ret$error)) {
print(ret$error)
} else {
print(ret$result)
}
}
invisible()
}
rs_attach <- function(self, private) {
out <- self$get_output_connection()
err <- self$get_error_connection()
while (nchar(x <- processx::processx_conn_read_chars(out))) cat(x)
while (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x))
tryCatch({
while (TRUE) {
cmd <- rs__attach_get_input(paste0("RS ", self$get_pid(), " > "))
if (cmd == ".q") break
try(update_history(cmd), silent = TRUE)
private$write_for_sure(paste0(cmd, "\n"))
private$report_back(202, "done")
private$attach_wait()
} },
interrupt = function(e) { self$interrupt(); invisible() }
)
}
## Internal functions ----------------------------------------------------
rs__attach_get_input <- function(prompt) {
cmd <- readline(prompt = prompt)
while (! is_complete_expression(cmd)) {
cmd <- paste0(cmd, sep = "\n", readline(prompt = "+ "))
}
cmd
}
rs__attach_wait <- function(self, private) {
out <- self$get_output_connection()
err <- self$get_error_connection()
pro <- private$pipe
while (TRUE) {
pr <- processx::poll(list(out, err, pro), -1)
if (pr[[1]] == "ready") {
if (nchar(x <- processx::processx_conn_read_chars(out))) cat(x)
}
if (pr[[2]] == "ready") {
if (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x))
}
if (pr[[3]] == "ready") {
msg <- self$read()
if (msg$code == 202) break;
}
}
}
rs__report_back <- function(self, private, code, text) {
cmd <- paste0(
deparse(rs__status_expr(code, text, fd = 3)),
"\n"
)
private$write_for_sure(cmd)
}
rs__write_for_sure <- function(self, private, text) {
while (1) {
text <- self$write_input(text)
if (!length(text)) break;
Sys.sleep(.1)
}
}
rs__parse_msg <- function(self, private, msg) {
code <- as.character(msg$header$code)
message <- msg$body
if (length(message) && substr(message, 1, 8) == "base64::") {
message <- substr(message, 9, nchar(message))
message <- unserialize(processx::base64_decode(message))
} else {
message <- msg$header$rest
}
if (! code %in% names(rs__parse_msg_funcs)) {
throw(new_error("Unknown message code: `", code, "`"))
}
structure(
rs__parse_msg_funcs[[code]](self, private, msg$header$code, message),
class = "callr_session_result")
}
rs__parse_msg_funcs <- list()
rs__parse_msg_funcs[["200"]] <- function(self, private, code, message) {
if (private$state != "busy") {
throw(new_error("Got `done` message when session is not busy"))
}
private$state <- "idle"
res <- private$get_result_and_output()
c(list(code = code, message = message), res)
}
rs__parse_msg_funcs[["201"]] <- function(self, private, code, message) {
if (private$state != "starting") {
throw(new_error("Session already started, invalid `starting` message"))
}
private$state <- "idle"
list(code = code, message = message)
}
rs__parse_msg_funcs[["202"]] <- function(self, private, code, message) {
private$state <- "idle"
list(code = code, message = message)
}
rs__parse_msg_funcs[["301"]] <- function(self, private, code, message) {
## TODO: progress bar update, what to do here?
list(code = code, message = message)
}
rs__parse_msg_funcs[["500"]] <- function(self, private, code, message) {
private$state <- "finished"
res <- private$get_result_and_output(std = TRUE)
c(list(code = code, message = message), res)
}
rs__parse_msg_funcs[["501"]] <- function(self, private, code, message) {
private$state <- "finished"
err <- structure(
list(message = message),
class = c("error", "condition"))
res <- private$get_result_and_output(std = TRUE)
res$error <- err
c(list(code = code, message = message), res)
}
rs__parse_msg_funcs[["502"]] <- rs__parse_msg_funcs[["501"]]
rs__status_expr <- function(code, text = "", fd = 3L) {
substitute(
local({
pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib
code_ <- code; fd_ <- fd; text_ <- text
data <- paste0(code_, " 0 ", text_, "\n")
pxlib$write_fd(as.integer(fd), data)
}),
list(code = code, fd = fd, text = text)
)
}
rs__prehook <- function(stdout, stderr) {
oexpr <- if (!is.null(stdout)) substitute({
assign(
".__stdout__",
as.environment("tools:callr")$`__callr_data__`$pxlib$
set_stdout_file(`__fn__`),
envir = as.environment("tools:callr")$`__callr_data__`)
}, list(`__fn__` = stdout))
eexpr <- if (!is.null(stderr)) substitute({
assign(
".__stderr__",
as.environment("tools:callr")$`__callr_data__`$pxlib$
set_stderr_file(`__fn__`),
envir = as.environment("tools:callr")$`__callr_data__`)
}, list(`__fn__` = stderr))
substitute({ o; e }, list(o = oexpr, e = eexpr))
}
rs__posthook <- function(stdout, stderr) {
oexpr <- if (!is.null(stdout)) substitute({
as.environment("tools:callr")$`__callr_data__`$
pxlib$set_stdout(as.environment("tools:callr")$`__callr_data__`$
.__stdout__)
})
eexpr <- if (!is.null(stderr)) substitute({
as.environment("tools:callr")$`__callr_data__`$
pxlib$set_stderr(as.environment("tools:callr")$`__callr_data__`$
.__stderr__)
})
substitute({ o; e }, list(o = oexpr, e = eexpr))
}
rs__get_result_and_output <- function(self, private, std) {
## Get stdout and stderr
stdout <- if (!is.null(private$tmp_output_file) &&
file.exists(private$tmp_output_file)) {
tryCatch(suppressWarnings(read_all(private$tmp_output_file)),
error = function(e) "")
} else if (std && self$has_output_connection()) {
tryCatch(self$read_all_output(), error = function(err) NULL)
}
stderr <- if (!is.null(private$tmp_error_file) &&
file.exists(private$tmp_error_file)) {
tryCatch(suppressWarnings(read_all(private$tmp_error_file)),
error = function(e) "")
} else if (std && self$has_error_connection()) {
tryCatch(self$read_all_error(), error = function(err) NULL)
}
unlink(c(private$tmp_output_file, private$tmp_error_file))
private$tmp_output_file <- private$tmp_error_file <- NULL
## Get result or error from RDS
outp <- list(
status = 0,
stdout = stdout %||% "",
stderr = stderr %||% "",
timeout = FALSE
)
res <- err <- NULL
tryCatch(
res <- get_result(outp, private$options),
error = function(e) err <<- e,
interrupt = function(e) err <<- e
)
unlink(private$options$tmp_files, recursive = TRUE)
private$options$tmp_files <- NULL
## Assemble result
list(result = res, stdout = stdout, stderr = stderr, error = err)
}
rs__handle_condition <- function(cond) {
default_handler <- function(x) {
classes <- class(x)
for (cl in classes) {
opt <- paste0("callr.condition_handler_", cl)
if (!is.null(val <- getOption(opt)) && is.function(val)) {
val(x)
break
}
}
}
if (is.list(cond) && is.null(cond$muffle)) {
cond$muffle <- "callr_r_session_muffle"
}
withRestarts({
signalCondition(cond)
default_handler(cond) },
callr_r_session_muffle = function() NULL,
muffleMessage = function() NULL
)
invisible()
}
## Helper functions ------------------------------------------------------
#' Create options for an [r_session] object
#'
#' @param ... Options to override, named arguments.
#' @return Named list of options.
#'
#' The current options are:
#' * `libpath`: Library path for the subprocess. By default the same as the
#' _current_ library path. I.e. _not_ necessarily the library path of
#' a fresh R session.)
#' * `repos`: `repos` option for the subprocess. By default the current
#' value of the main process.
#' * `stdout`: Standard output of the sub-process. This can be `NULL` or
#' a pipe: `"|"`. If it is a pipe then the output of the subprocess is
#' not included in the responses, but you need to poll and read it
#' manually. This is for experts. Note that this option is not used
#' for the startup phase that currently always runs with `stdout = "|"`.
#' * `stderr`: Similar to `stdout`, but for the standard error. Like
#' `stdout`, it is not used for the startup phase, which runs with
#' `stderr = "|"`.
#' * `error`: See 'Error handling' in [r()].
#' * `cmdargs`: See the same argument of [r()]. (Its default might be
#' different, though.)
#' * `system_profile`: See the same argument of [r()].
#' * `user_profile`: See the same argument of [r()].
#' * `env`: See the same argument of [r()].
#' * `load_hook`: `NULL`, or code (quoted) to run in the sub-process
#' at start up. (I.e. not for every single `run()` call.)
#' * `extra`: List of extra arguments to pass to [processx::process].
#'
#' Call `r_session_options()` to see the default values.
#' `r_session_options()` might contain undocumented entries, you cannot
#' change these.
#'
#' @export
#' @examples
#' r_session_options()
r_session_options <- function(...) {
update_options(r_session_options_default(), ...)
}
r_session_options_default <- function() {
list(
func = NULL,
args = NULL,
libpath = .libPaths(),
repos = default_repos(),
stdout = NULL,
stderr = NULL,
error = getOption("callr.error", "error"),
cmdargs = c(
if (os_platform() != "windows") "--no-readline",
"--slave",
"--no-save",
"--no-restore"
),
system_profile = FALSE,
user_profile = "project",
env = c(TERM = "dumb"),
supervise = FALSE,
load_hook = NULL,
extra = list(),
arch = "same"
)
}
#' Interactive debugging of persistent R sessions
#'
#' The `r_session$debug()` method is an interactive debugger to inspect
#' the stack of the background process after an error.
#'
#' Note that on callr version 3.8.0 and above, you need to set the
#' `callr.traceback` option to `TRUE` (in the main process) to make
#' the subprocess dump the frames on error. This is because saving
#' the frames can be costly for large objects passed as arguments.
#'
#' `$debug()` starts a REPL (Read-Eval-Print-Loop), that evaluates R
#' expressions in the subprocess. It is similar to [browser()] and
#' [debugger()] and also has some extra commands:
#'
#' * `.help` prints a short help message.
#' * `.where` prints the complete stack trace of the error. (The same as
#' the `$traceback()` method.
#' * `.inspect <n>` switches the "focus" to frame `<n>`. Frame 0 is the
#' global environment, so `.inspect 0` will switch back to that.
#'
#' To exit the debugger, press the usual interrupt key, i.e. `CTRL+c` or
#' `ESC` in some GUIs.
#'
#' Here is an example session that uses `$debug()` (some output is omitted
#' for brevity):
#'
#' ```
#' # ----------------------------------------------------------------------
#' > rs <- r_session$new()
#' > rs$run(function() knitr::knit("no-such-file"))
#' Error in rs_run(self, private, func, args) :
#' callr subprocess failed: cannot open the connection
#'
#' > rs$debug()
#' Debugging in process 87361, press CTRL+C (ESC) to quit. Commands:
#' .where -- print stack trace
#' .inspect <n> -- inspect a frame, 0 resets to .GlobalEnv
#' .help -- print this message
#' <cmd> -- run <cmd> in frame or .GlobalEnv
#'
#' 3: file(con, "r")
#' 2: readLines(input2, encoding = "UTF-8", warn = FALSE)
#' 1: knitr::knit("no-such-file") at #1
#'
#' RS 87361 > .inspect 1
#'
#' RS 87361 (frame 1) > ls()
#' [1] "encoding" "envir" "ext" "in.file" "input" "input.dir"
#' [7] "input2" "ocode" "oconc" "oenvir" "oopts" "optc"
#' [13] "optk" "otangle" "out.purl" "output" "quiet" "tangle"
#' [19] "text"
#'
#' RS 87361 (frame 1) > input
#' [1] "no-such-file"
#'
#' RS 87361 (frame 1) > file.exists(input)
#' [1] FALSE
#'
#' RS 87361 (frame 1) > # <CTRL + C>
#' # ----------------------------------------------------------------------
#' ```
#'
#' @name r_session_debug
NULL
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.