Nothing
#' Class definition for pipeline tools
#' @seealso \code{\link{pipeline}}
PipelineTools <- R6::R6Class(
classname = "PipelineTools",
portable = TRUE,
cloneable = TRUE,
private = list(
.pipeline_path = character(),
.pipeline_name = character(),
.settings_file = character(),
.settings = NULL,
.settings_external_inputs = list()
),
public = list(
#' @description construction function
#' @param pipeline_name name of the pipeline, usually in the pipeline
#' \code{'DESCRIPTION'} file, or pipeline folder name
#' @param settings_file the file name of the settings file, where the
#' user inputs are stored
#' @param paths the paths to find the pipeline, usually the parent folder
#' of the pipeline; default is \code{pipeline_root()}
#' @param temporary whether not to save \code{paths} to current pipeline
#' root registry. Set this to \code{TRUE} when importing pipelines
#' from subject pipeline folders
initialize = function(pipeline_name,
settings_file = "settings.yaml",
paths = pipeline_root(), temporary = FALSE) {
default_paths <- c(".", file.path(R_user_dir('raveio', 'data'), "pipelines"))
paths <- c(paths[dir.exists(paths)], default_paths)
private$.pipeline_path <- pipeline_find(pipeline_name, root_path = pipeline_root(paths, temporary = temporary))
private$.pipeline_name <- attr(private$.pipeline_path, "target_name")
private$.settings_file <- settings_file
pipeline_settings_path <- file.path(
private$.pipeline_path,
private$.settings_file
)
settings <- load_yaml(pipeline_settings_path)
lapply(names(settings), function(nm) {
if(nm == "") { return() }
opts <- resolve_pipeline_settings_opt(settings[[nm]], strict = FALSE)
if(is.null(opts) || !is.list(opts)) { return() }
opts$raw_input <- settings[[nm]]
private$.settings_external_inputs[[nm]] <- opts
settings[[nm]] <- resolve_pipeline_settings_value( settings[[nm]], pipe_dir = private$.pipeline_path )
})
private$.settings <- settings
},
#' @description set inputs
#' @param ...,.list named list of inputs; all inputs should be named,
#' otherwise errors will be raised
set_settings = function(..., .list = NULL) {
args <- c(list(...), as.list(.list))
argnames <- names(args)
if(length(args)) {
if(!length(argnames) || "" %in% argnames) {
stop("`pipeline_set`: all input lists must have names")
}
external_inputs <- names(private$.settings_external_inputs)
external_args <- argnames[argnames %in% external_inputs]
internal_args <- argnames[!argnames %in% external_inputs]
lapply(external_args, function(nm) {
new_val <- args[[nm]]
opts <- private$.settings_external_inputs[[nm]]
pipeline_save_extdata(
data = new_val,
name = opts$name,
format = opts$format,
overwrite = TRUE,
pipe_dir = private$.pipeline_path
)
cls <- class(new_val)
if( !"raveio-pipeline-extdata" %in% cls ) {
cls <- c("raveio-pipeline-extdata", cls)
}
private$.settings[[nm]] <- structure(
new_val, class = cls,
`raveio-pipeline-extdata-opts` = opts
)
return()
})
lapply(internal_args, function(nm) {
private$.settings[[nm]] <- args[[nm]]
return()
})
# TODO: check whether this should be put outside, i.e. save settings
# no matter the settings have been changed or not
pipeline_settings_path <- file.path(
private$.pipeline_path,
private$.settings_file
)
settings_copy <- as.list(private$.settings)
if(length(external_inputs)) {
settings_copy[external_inputs] <- lapply(private$.settings_external_inputs, "[[", "raw_input")
}
save_yaml(
x = settings_copy,
file = pipeline_settings_path,
sorted = TRUE
)
}
return(invisible(as.list(private$.settings)))
},
#' @description get current inputs
#' @param key the input name; default is missing, i.e., to get all the
#' settings
#' @param default default value if not found
#' @param constraint the constraint of the results; if input value is not
#' from \code{constraint}, then only the first element of \code{constraint}
#' will be returned.
#' @returns The value of the inputs, or a list if \code{key} is missing
get_settings = function(key, default = NULL, constraint) {
if(missing(key)){
return(as.list(private$.settings))
}
if(!private$.settings$`@has`(key)){
re <- default
} else {
re <- private$.settings[[key]]
}
if(!missing(constraint)){
re <- re %OF% constraint
}
re
},
#' @description read intermediate variables
#' @param var_names the target names, can be obtained via
#' \code{x$target_table} member; default is missing, i.e., to read
#' all the intermediate variables
#' @param ifnotfound variable default value if not found
#' @param ... other parameters passing to \code{\link{pipeline_read}}
#' @returns The values of the targets
read = function(var_names, ifnotfound = NULL, ...) {
if(missing(var_names)) {
var_names <- pipeline_target_names(pipe_dir = private$.pipeline_path)
} else {
var_names_quoted <- substitute(var_names)
if(typeof(var_names_quoted) == "language" &&
identical(var_names_quoted[[1]], quote(`-`))) {
all_names <- pipeline_target_names(pipe_dir = private$.pipeline_path)
var_names <- all_names[!all_names %in% eval(var_names_quoted[[2]], envir = parent.frame())]
}
}
pipeline_read(var_names = var_names, pipe_dir = private$.pipeline_path,
ifnotfound = ifnotfound, ...)
},
#' @description run the pipeline
#' @param names pipeline variable names to calculate; default is to
#' calculate all the targets
#' @param async whether to run asynchronous in another process
#' @param as_promise whether to return a \code{\link{PipelineResult}}
#' instance
#' @param scheduler,type,envir,callr_function,return_values,... passed to
#' \code{\link{pipeline_run}} if \code{as_promise} is true, otherwise
#' these arguments will be passed to \code{pipeline_run_bare}
#' @returns A \code{\link{PipelineResult}} instance if \code{as_promise}
#' or \code{async} is true; otherwise a list of values for input \code{names}
run = function(names = NULL, async = FALSE, as_promise = async,
scheduler = c("none", "future", "clustermq"),
type = c("smart", "callr", "vanilla"),
envir = new.env(parent = globalenv()),
callr_function = NULL, return_values = TRUE,
...) {
if(!as_promise && async) {
stop("If you run the pipeline asynchronous, then the result must be a `promise` object")
}
if(missing(scheduler) && missing(type)) {
py_module_exists <- tryCatch({
self$python_module("exist")
}, error = function(e) { FALSE })
if( isTRUE(py_module_exists) ) {
scheduler <- "future"
type <- "callr"
} else {
scheduler <- match.arg(scheduler)
type <- match.arg(type)
}
} else {
scheduler <- match.arg(scheduler)
type <- match.arg(type)
}
force(envir)
force(callr_function)
expr <- bquote(pipeline_run_bare(
pipe_dir = .(private$.pipeline_path), scheduler = .(scheduler),
type = .(type), envir = envir, callr_function = .(callr_function),
names = .(names), return_values = .(return_values), ...))
if( as_promise ) {
expr[[1]] <- quote(pipeline_run)
expr[["async"]] <- async
}
eval(expr)
},
#' @description run the pipeline in order; unlike \code{$run()}, this method
#' does not use the \code{targets} infrastructure, hence the pipeline
#' results will not be stored, and the order of \code{names} will be
#' respected.
#' @param names pipeline variable names to calculate; must be specified
#' @param env environment to evaluate and store the results
#' @param clean whether to evaluate without polluting \code{env}
eval = function(names, env = parent.frame(), clean = TRUE) {
if(clean) {
envir <- new.env(parent = env)
} else {
envir <- env
}
# shared_path <- file.path(private$.pipeline_path, "R")
# shared_libs <- list.files(shared_path, pattern = "^shared-.*\\.R",
# full.names = TRUE, ignore.case = TRUE)
# shared_libs <- sort(shared_libs)
#
# lapply(shared_libs, function(f) {
# source(file = f, local = envir, chdir = TRUE)
# })
# list2env(self$get_settings(), envir = envir)
pipeline_eval(names = names, env = envir, pipe_dir = private$.pipeline_path,
settings_path = self$settings_path)
},
#' @description run the pipeline shared library in scripts starting with
#' path \code{R/shared}
#' @returns An environment of shared variables
shared_env = function() {
return(pipeline_shared(pipe_dir = private$.pipeline_path))
},
#' @description get 'Python' module embedded in the pipeline
#' @param type return type, choices are \code{'info'} (get basic information
#' such as module path, default), \code{'module'} (load module and return
#' it), \code{'shared'} (load a shared sub-module from the module, which
#' is shared also in report script), and \code{'exist'} (returns true
#' or false on whether the module exists or not)
#' @param must_work whether the module needs to be existed or not. If
#' \code{TRUE}, the raise errors when the module does not exist; default
#' is \code{TRUE}, ignored when \code{type} is \code{'exist'}.
#' @returns See \code{type}
python_module = function(type = c("info", "module", "shared", "exist"),
must_work = TRUE) {
type <- match.arg(type)
if(type == "exist") { must_work <- FALSE }
re <- tryCatch({
if( type == "module" ) {
return(pipeline_py_module(
pipe_dir = self$pipeline_path,
must_work = must_work,
convert = FALSE
))
}
minfo <- pipeline_py_info(pipe_dir = self$pipeline_path, must_work = must_work)
switch(
type,
"info" = { return(minfo) },
"exist" = {
return(isTRUE(is.list(minfo)))
},
{
if(!is.list(minfo)) { return(NULL) }
pypath <- file.path(self$pipeline_path, "py")
cwd <- getwd()
on.exit({
if(length(cwd) == 1) { setwd(cwd) }
}, add = TRUE, after = FALSE)
setwd(pypath)
shared <- rpymat::import(sprintf("%s.shared", minfo$module_name),
convert = FALSE, delay_load = FALSE)
setwd(cwd)
cwd <- NULL
return(shared)
}
)
}, error = function(e) {
if(must_work) {
stop(e)
}
NULL
})
return(re)
},
#' @description get progress of the pipeline
#' @param method either \code{'summary'} or \code{'details'}
#' @returns A table of the progress
progress = function(method = c("summary", "details")) {
method <- match.arg(method)
pipeline_progress(pipe_dir = private$.pipeline_path, method = method)
},
#' @description attach pipeline tool to environment (internally used)
#' @param env an environment
attach = function(env) {
env$pipeline_set <- self$set_settings
env$pipeline_get <- self$get_settings
env$pipeline_settings_path <- self$settings_path
env$pipeline_path <- private$.pipeline_path
},
#' @description visualize pipeline target dependency graph
#' @param glimpse whether to glimpse the graph network or render the state
#' @param aspect_ratio controls node spacing
#' @param node_size,label_size size of nodes and node labels
#' @param ... passed to \code{\link{pipeline_visualize}}
#' @returns Nothing
visualize = function(glimpse = FALSE, aspect_ratio = 2, node_size = 30, label_size = 40, ...) {
args <- list(pipe_dir = private$.pipeline_path, glimpse = glimpse, ...)
tryCatch({
widget <- pipeline_dependency_graph(
glimpse = glimpse, pipeline_path = private$.pipeline_path,
aspect_ratio = aspect_ratio, node_size = node_size, label_size = label_size, ...)
asNamespace("htmlwidgets")
print(widget)
}, error = function(e) {
do.call(pipeline_visualize, args)
})
return(invisible())
},
#' @description fork (copy) the current pipeline to a new directory
#' @param path path to the new pipeline, a folder will be created there
#' @param filter_pattern file pattern to copy
#' @returns A new pipeline object based on the path given
fork = function(path, filter_pattern = PIPELINE_FORK_PATTERN) {
pipeline_fork(
src = self$pipeline_path,
dest = path,
filter_pattern = filter_pattern,
activate = FALSE
)
pipeline(
pipeline_name = basename(path),
settings_file = basename(self$settings_path),
paths = dirname(path)
)
},
#' @description run code with pipeline activated, some environment variables
#' and function behaviors might change under such condition (for example,
#' \code{targets} package functions)
#' @param expr expression to evaluate
#' @param quoted whether \code{expr} is quoted; default is false
#' @param env environment to run \code{expr}
with_activated = function(expr, quoted = FALSE, env = parent.frame()) {
if(!quoted) {
expr <- substitute(expr)
}
activate_pipeline(pipe_dir = private$.pipeline_path)
# don't mess with self$eval
basens <- baseenv()
basens$eval(expr, envir = env)
},
#' @description clean all or part of the data store
#' @param destroy,ask see \code{\link[targets]{tar_destroy}}
clean = function(destroy = c("all", "cloud", "local", "meta", "process",
"progress", "objects", "scratch", "workspaces"),
ask = FALSE) {
destroy <- match.arg(destroy)
pipeline_clean(pipe_dir = private$.pipeline_path, ask = ask, destroy = destroy)
},
#' @description save data to pipeline data folder
#' @param data R object
#' @param name the name of the data to save, must start with letters
#' @param format serialize format, choices are \code{'json'},
#' \code{'yaml'}, \code{'csv'}, \code{'fst'}, \code{'rds'}; default is
#' \code{'json'}. To save arbitrary objects such as functions or
#' environments, use \code{'rds'}
#' @param overwrite whether to overwrite existing files; default is no
#' @param ... passed to saver functions
#' @returns the saved file path
save_data = function(data, name, format = c("json", "yaml", "csv", "fst", "rds"),
overwrite = FALSE, ...) {
format <- match.arg(format)
pipeline_save_extdata(
data = data, name = name, format = format,
overwrite = overwrite, pipe_dir = self$pipeline_path, ...)
},
#' @description load data from pipeline data folder
#' @param name the name of the data
#' @param error_if_missing whether to raise errors if the name is missing
#' @param default_if_missing default values to return if the name is missing
#' @param format the format of the data, default is automatically obtained
#' from the file extension
#' @param ... passed to loader functions
#' @returns the data if file is found or a default value
load_data = function(name, error_if_missing = TRUE, default_if_missing = NULL,
format = c("auto", "json", "yaml", "csv", "fst", "rds"), ...) {
format <- match.arg(format)
pipeline_load_extdata(name = name, format = format,
error_if_missing = error_if_missing,
default_if_missing = default_if_missing,
pipe_dir = self$pipeline_path, ...)
}
),
active = list(
#' @field settings_path absolute path to the settings file
settings_path = function() {
file.path(
private$.pipeline_path,
private$.settings_file
)
},
#' @field extdata_path absolute path to the user-defined pipeline data folder
extdata_path = function() {
file.path(private$.pipeline_path, "data")
},
#' @field target_table table of target names and their descriptions
target_table = function() {
re <- pipeline_target_names(pipe_dir = private$.pipeline_path)
des <- sapply(strsplit(names(re), "_"), function(x){
x <- x[x != ""]
if(!length(x)) { return("<No description>") }
substr(x[[1]], start = 1, stop = 1) <- toupper(
substr(x[[1]], start = 1, stop = 1)
)
paste(x, collapse = " ")
})
data.frame(
Names = unname(re),
Description = des
)
},
#' @field result_table summary of the results, including
#' signatures of data and commands
result_table = function() {
pipeline_vartable(pipe_dir = private$.pipeline_path)
},
#' @field pipeline_path the absolute path of the pipeline
pipeline_path = function() {
private$.pipeline_path
},
#' @field pipeline_name the code name of the pipeline
pipeline_name = function() {
private$.pipeline_name
}
)
)
#' @title Creates 'RAVE' pipeline instance
#' @description Set pipeline inputs, execute, and read pipeline outputs
#' @param pipeline_name the name of the pipeline, usually title field in the
#' \code{'DESCRIPTION'} file, or the pipeline folder name (if description
#' file is missing)
#' @param settings_file the name of the settings file, usually stores user
#' inputs
#' @param temporary see \code{\link{pipeline_root}}
#' @param paths the paths to search for the pipeline, usually the parent
#' directory of the pipeline; default is \code{\link{pipeline_root}}, which
#' only search for pipelines that are installed or in current working directory.
#' @returns A \code{\link{PipelineTools}} instance
#' @examples
#'
#' if(!is_on_cran()) {
#'
#' library(raveio)
#'
#' # ------------ Set up a bare minimal example pipeline ---------------
#' pipeline_path <- pipeline_create_template(
#' root_path = tempdir(), pipeline_name = "raveio_demo",
#' overwrite = TRUE, activate = FALSE, template_type = "rmd-bare")
#'
#' save_yaml(list(
#' n = 100, pch = 16, col = "steelblue"
#' ), file = file.path(pipeline_path, "settings.yaml"))
#'
#' pipeline_build(pipeline_path)
#'
#' rmarkdown::render(input = file.path(pipeline_path, "main.Rmd"),
#' output_dir = pipeline_path,
#' knit_root_dir = pipeline_path,
#' intermediates_dir = pipeline_path, quiet = TRUE)
#'
#' utils::browseURL(file.path(pipeline_path, "main.html"))
#'
#' # --------------------- Example starts ------------------------
#'
#' pipeline <- pipeline("raveio_demo", paths = tempdir())
#'
#' pipeline$run("plot_data")
#'
#' # Run again and you will see some targets are skipped
#' pipeline$set_settings(pch = 2)
#' pipeline$run("plot_data")
#'
#' head(pipeline$read("input_data"))
#'
#' # or use
#' pipeline[c("n", "pch", "col")]
#' pipeline[-c("input_data")]
#'
#' pipeline$target_table
#'
#' pipeline$result_table
#'
#' pipeline$progress("details")
#'
#' # --------------------- Clean up ------------------------
#' unlink(pipeline_path, recursive = TRUE)
#'
#' }
#' @export
pipeline <- function(pipeline_name,
settings_file = "settings.yaml",
paths = pipeline_root(),
temporary = FALSE) {
PipelineTools$new(pipeline_name, settings_file, paths, temporary = temporary)
}
#' @export
`[.PipelineTools` <- function(x, ...) {
# args <- deparse1(c(...))
# as.call(quote(x$read), args)
expr <- as.list(match.call(expand.dots = TRUE))
expr[[1]] <- x$read
expr[["x"]] <- NULL
expr <- as.call(expr)
eval(expr, envir = parent.frame())
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.