Nothing
#' @title Pipeline result object
PipelineResult <- R6::R6Class(
classname = "PipelineResult",
portable = TRUE,
cloneable = TRUE,
private = list(
.path = character(0L),
.state = character(0L),
.process_type = character(0L),
.process = NULL,
.vartable = NULL,
.invalidated = FALSE,
.current_progress = NULL,
finalize = function(...){
self$invalidate()
},
close_progressor = function(){
try({
if(length(self$progressor) && !self$progressor$is_closed()){
self$progressor$close()
# self$progressor <- NULL
}
}, silent = !self$verbose)
}
),
public = list(
#' @field progressor progress bar object, usually generated from \code{\link[dipsaus]{progress2}}
progressor = NULL,
#' @field promise a \code{\link[promises]{promise}} instance that monitors
#' the pipeline progress
promise = NULL,
#' @field verbose whether to print warning messages
verbose = FALSE,
#' @field names names of the pipeline to build
names = NULL,
#' @field async_callback function callback to call in each check loop;
#' only used when the pipeline is running in \code{async=TRUE} mode
async_callback = NULL,
#' @field check_interval used when \code{async=TRUE} in
#' \code{\link{pipeline_run}}, interval in seconds to check the progress
check_interval = 0.1,
#' @description check if result is valid, raises errors when invalidated
validate = function(){
if(private$.invalidated){
stop("This result has been invalidated")
}
invisible()
},
#' @description invalidate the pipeline result
invalidate = function(){
private$.invalidated <- TRUE
private$.state <- "invalidated"
if(inherits(private$.process, 'process')){
try({
if(isTRUE(private$.process$is_alive())){
private$.process$kill()
}
private$.process <- NULL
}, silent = !self$verbose)
}
private$close_progressor()
},
#' @description get pipeline progress
get_progress = function(){
self$validate()
tbl <- pipeline_progress(pipe_dir = private$.path, method = "details")
self$variables
tbl <- merge(private$.vartable[,c('name', 'description')], tbl, by = 'name', all.x = TRUE, sort = FALSE)
tbl$progress[is.na(tbl$progress)] <- "initialize"
tbl_bk <- tbl
on.exit({
private$.vartable <- tbl_bk
}, add = TRUE, after = FALSE)
# tbl$progress[tbl$progress == "skipped"] <- "built"
previous <- private$.vartable$progress %in% 'started'
# finished <- !tbl$progress %in% 'initialize'
started <- tbl$progress %in% "started"
sel <- started & !previous
if(any(sel)){
sel <- which(sel)
sel <- sel[[length(sel)]]
private$.current_progress <- sel
} else {
sel <- max(private$.current_progress, 1)
}
return(list(
index = sel,
name = tbl$name[[sel]],
description = tbl$description[[sel]],
progress = tbl$progress[[sel]]
))
},
#' @description constructor (internal)
#' @param path pipeline path
#' @param verbose whether to print warnings
initialize = function(path = character(0L), verbose = FALSE){
private$.path <- path
private$.current_progress <- 0
private$.state <- "initialize"
self$verbose <- isTRUE(as.logical(verbose))
},
#' @description run pipeline (internal)
#' @param expr expression to evaluate
#' @param env environment of \code{expr}
#' @param quoted whether \code{expr} has been quoted
#' @param async whether the process runs in other sessions
#' @param process the process object inherits \code{\link[callr]{process}},
#' will be inferred from \code{expr} if \code{process=NULL},
#' and will raise errors if cannot be found
run = function(expr, env = parent.frame(), quoted = FALSE,
async = FALSE, process = NULL) {
if(!quoted){
expr <- substitute(expr)
}
# running, ready, errored
private$.state <- "running"
private$.vartable <- NULL
# self$names <- names
if(async){
private$.process_type <- 'remote'
self$promise <- promises::promise(
function(resolve, reject){
process <- tryCatch({
process <- eval(expr, env)
if(inherits(process, "r_process")) {
private$.process <- process
} else {
stop("`PipelineResult`: `expr` must return a callr::r_process instance")
}
process
}, error = function(e){
private$.state <- "errored"
private$close_progressor()
reject(e)
NULL
})
if(is.null(process)) { return() }
run_async_callback <- function() {
tryCatch({
if(is.function(self$async_callback)) {
self$async_callback()
}
}, error = warning)
}
callback <- function(){
continue <- tryCatch({
if(private$.invalidated){
private$.state <- "canceled"
self$invalidate()
e <- simpleCondition("Pipeline canceled")
run_async_callback()
reject(e)
return()
}
progress <- self$get_progress()
if(!private$.process$is_alive()){
private$.state <- "finished"
private$close_progressor()
private$.process$get_result()
resolve(private$.vartable)
return()
}
# show progress
if(length(self$progressor)){
old_val <- self$progressor$get_value()
increment <- progress$index - old_val
if(increment > 0){
self$progressor$inc(
detail = progress$description,
amount = increment
)
}
}
# nrow(private$.vartable)
TRUE
}, error = function(e){
private$.state <- "errored"
private$close_progressor()
e
})
run_async_callback()
if(isTRUE(continue)){
later::later(callback, delay = self$check_interval)
} else {
reject(callback)
return()
}
}
callback()
}
)
} else {
private$.process_type <- 'native'
self$promise <- promises::promise(
function(resolve, reject){
tryCatch({
eval(expr, env)
private$.state <- "finished"
# self$variables
resolve(private$.vartable)
}, error = function(e) {
private$.state <- "errored"
private$close_progressor()
reject(e)
})
}
)
}
},
#' @description wait until some targets get finished
#' @param names target names to wait, default is \code{NULL}, i.e. to
#' wait for all targets that have been scheduled
#' @param timeout maximum waiting time in seconds
#' @returns \code{TRUE} if the target is finished, or \code{FALSE} if
#' timeout is reached
await = function(names = NULL, timeout = Inf){
if(!self$valid){ return(FALSE) }
promise_impl <- attr(self$promise, "promise_impl")
now <- Sys.time()
if(length(names)){
missing_names <- names[!names %in% self$variables]
if(length(missing_names)){
stop("Unable to watch the following names: ", paste(missing_names, collapse = ", "))
}
} else {
names <- self$variables
}
sel <- which(private$.vartable$name %in% names)
while(
!promise_impl$status() %in% c("fulfilled", "rejected") &&
!later::loop_empty()
){
later::run_now(0.1)
if(private$.current_progress >= max(sel) &&
!any(private$.vartable$progress %in% c("initialize", "started"))) {
return(TRUE)
}
if(timeout <= as.numeric(Sys.time() - now, units = 'secs')){
return(FALSE)
}
}
return(TRUE)
},
#' @description print method
print = function(){
cat("<Pipeline result container> ")
if(private$.invalidated){
cat("(Invalidated)\n")
} else {
cat("\nprocess:", private$.process_type)
if(private$.state == 'running'){
cat(sprintf(
"\nstatus: %s (%d of %d)\n",
private$.state,
private$.current_progress,
length(self$variables)
))
} else {
cat(sprintf(
"\nstatus: %s\n",
private$.state
))
}
}
},
#' @description get results
#' @param names the target names to read
#' @param ... passed to code{link{pipeline_read}}
get_values = function(names = NULL, ...){
self$validate()
if(!length(names)){
names <- self$variables
}
pipeline_read(var_names = names, pipe_dir = private$.path, ...)
}
),
active = list(
#' @field variables target variables of the pipeline
variables = function(){
if(!is.data.frame(private$.vartable)){
self$validate()
variables <- pipeline_target_names(pipe_dir = private$.path)
tarnames_readable <- names(variables)
nvars <- length(variables)
nactual <- length(tarnames_readable)
if(nactual < nvars){
tarnames_readable <- c(tarnames_readable, rep('', nvars - nactual))
}
descr <- sapply(seq_len(nvars), function(ii){
nm <- tarnames_readable[[ii]]
if(nm == ""){
return(sprintf('Calculating `%s`', variables[[ii]]))
} else {
msg <- unlist(strsplit(nm, "[_-]+"))
msg <- msg[msg != ""]
msg <- paste(msg, collapse = " ")
if(nchar(msg)){
msg <- sub("^[a-z]", toupper(substr(msg, 1, 1)), msg)
}
return(msg)
}
})
tbl <- data.frame(
name = unname(variables),
description = descr,
progress = "initialize",
stringsAsFactors = FALSE
)
# tbl$included <- TRUE
if(length(self$names)){
sel <- tbl$name %in% self$names
if(any(sel)){
# tbl$included <- sel
tbl <- tbl[tbl$name %in% self$names, ]
}
}
private$.vartable <- tbl
}
private$.vartable$name
},
#' @field variable_descriptions readable descriptions of the target variables
variable_descriptions = function(){
self$variables
private$.vartable$description
},
#' @field valid logical true or false whether the result instance hasn't
#' been invalidated
valid = function(){
!private$.invalidated
},
#' @field status result status, possible status are \code{'initialize'},
#' \code{'running'}, \code{'finished'}, \code{'canceled'},
#' and \code{'errored'}. Note that \code{'finished'} only means the pipeline
#' process has been finished.
status = function(){
private$.state
},
#' @field process (read-only) process object if the pipeline is running in
#' \code{'async'} mode, or \code{NULL}; see \code{\link[callr]{r_bg}}.
process = function(){
private$.process
}
)
)
#' @export
as.promise.PipelineResult <- function(x){
x$promise
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.