#' pipeComp - a framework for pipeline benchmarking
#'
#' \pkg{pipeComp} is a simple framework to facilitate the comparison of
#' pipelines involving various steps and parameters. It was initially developed
#' to benchmark single-cell RNA sequencing pipelines, and contains pre-defined
#' \code{\link{PipelineDefinition}}s and functions to that effect, but could be
#' applied to any context. See `vignette("pipeComp")` for an introduction.
#'
#' @author Pierre-Luc Germain \email{pierre-luc.germain@hest.ethz.ch}
#' @author Anthony Sonrel \email{anthony.sonrel@uzh.ch}
#' @author Mark D. Robinson \email{mark.robinson@@imls.uzh.ch}
#' @name pipeComp-package
#' @aliases pipeComp
#' @docType package
NULL
#' runPipeline
#'
#' This function runs a pipeline with combinations of parameter variations on
#' nested steps. The pipeline has to be defined as a list of functions applied
#' consecutively on their respective outputs. See 'examples' for more details.
#'
#' @param datasets A named vector of initial objects or paths to rds files.
#' @param alternatives The (named) list of alternative values for each
#' parameter.
#' @param pipelineDef An object of class \code{\link{PipelineDefinition}}.
#' @param comb An optional matrix of indexes indicating the combination to run.
#' Each column should correspond to an element of `alternatives`, and contain
#' indexes relative to this element. If omitted, all combinations will be
#' performed.
#' @param output.prefix An optional prefix for the output files.
#' @param nthreads Number of threads, default 1. If the memory requirements are
#' very high or the first steps very long to compute, consider setting this as
#' the number of datasets or below.
#' @param saveEndResults Logical; whether to save the output of the last step.
#' @param debug Logical (default FALSE). When enabled, disables multithreading
#' and prints extra information.
#' @param skipErrors Logical. When enabled, `runPipeline` will
#' continue even when an error has been encountered, and report the list of
#' steps/datasets in which errors were encountered.
#' @param ... passed to MulticoreParam. Can for instance be used to set
#' `makeCluster` arguments, or set `threshold="TRACE"` when debugging in a
#' multithreaded context.
#'
#' @examples
#' pip <- mockPipeline()
#' datasets <- list( ds1=1:3, ds2=c(5,10,15) )
#' tmpdir1 <- paste0(tempdir(),"/")
#' res <- runPipeline(datasets, pipelineDef=pip, output.prefix=tmpdir1,
#' alternatives=list() )
#' # See the `pipeComp_scRNA` vignette for a more complex example
#'
#' @return A SimpleList with elapsed time and the results of the evaluation
#' functions defined by the given `pipelineDef`.
#'
#' The results are also stored in the output folder with:
#' \itemize{
#' \item The clustering results for each dataset (`endOutputs.rds` files),
#' \item A SimpletList of elapsed time and evaluations for each dataset
#' (`evaluation.rds` files),
#' \item A list of the `pipelineDef`, `alternatives`, `sessionInfo()` and
#' function call used to produce the results (`runPipelineInfo.rds` file),
#' \item A copy of the SimpleList returned by the function
#' (`aggregated.rds`file).
#' }
#'
#' @importFrom utils sessionInfo
#' @import methods BiocParallel S4Vectors
#' @export
runPipeline <- function( datasets, alternatives, pipelineDef, comb=NULL,
output.prefix="", nthreads=1, saveEndResults=TRUE,
debug=FALSE, skipErrors=TRUE, ...){
mcall <- match.call()
if(!is(pipelineDef,"PipelineDefinition"))
pipelineDef <- PipelineDefinition(pipelineDef)
alternatives <- .checkPipArgs(alternatives, pipelineDef)
pipDef <- stepFn(pipelineDef, type="functions")
if(is.null(names(datasets)))
names(datasets) <- paste0("dataset",seq_along(datasets))
if(any(grepl(" ",names(datasets))))
stop("Dataset names should not have spaces.")
if(any(grepl("\\.",names(datasets))))
warning("It is recommended not to use dots ('.') in dataset names to
facilitate browsing aggregated results.")
# check that output folder exists, otherwise create it
if(output.prefix!=""){
x <- gsub("[^/]+$","",output.prefix)
if(x!="" && !dir.exists(x)) dir.create(x, recursive=TRUE)
}
# prepare the combinations of parameters to use
alt <- alternatives[unlist(arguments(pipelineDef))]
if(is.null(comb)){
eg <- buildCombMatrix(alt, TRUE)
}else{
eg <- .checkCombMatrix(comb, alt)
}
names(dsnames) <- dsnames <- names(datasets)
if(!debug){
if(any( class(nthreads) %in%
c("SnowParam","MulticoreParam","SerialParam") )){
bpp <- nthreads
nthreads <- bpnworkers(bpp)
}else{
bpp <- MulticoreParam(nthreads, ...)
}
message(paste("Running", nrow(eg), "pipeline settings on", length(datasets),
"datasets using", nthreads,"threads"))
}else{
nthreads <- 1
bpp <- SerialParam()
}
if(!debug && nthreads>length(datasets)){
# splitting downstream of datasets
egs <- .splitEG(names(datasets), eg, nthreads)
dsnames <- rep(names(datasets),each=length(egs))
resfiles <- bpmapply( dsi=dsnames, eg=egs, BPPARAM=bpp,
FUN=function(dsi, eg) tryCatch(
.runPipelineF( dsi, datasets[[dsi]], pipelineDef,
alt, eg, output.prefix, noWrite=TRUE,
saveEndResults=saveEndResults,
skipErrors=skipErrors ),
error=function(e){
if(!skipErrors) stop(e)
cat(e)
NULL
})
)
resfiles <- split(resfiles, dsnames)
# reassembling and saving the results per dataset
resfiles <- lapply(names(datasets), FUN=function(dsi){
if(saveEndResults){
res <- do.call(c, lapply(resfiles[[dsi]], FUN=function(x) x$res))
saveRDS(res, file=paste0(output.prefix,"res.",dsi,".endOutputs.rds"))
}
if(length(resfiles[[dsi]])==1){
res <- resfiles[[dsi]][[1]][c("evaluation", "elapsed")]
}else{
res <- resfiles[[dsi]][[1]]
for(i in seq.int(from=2, to=length(resfiles[[dsi]]))){
res <- .dsMergeResults(res, resfiles[[dsi]][[i]])
}
}
metadata(res)$PipelineDefinition <- pipelineDef
ifile <- paste0(output.prefix,"res.",dsi,".evaluation.rds")
saveRDS(res, file=ifile)
return(ifile)
})
}else if(!debug && nthreads>1 && length(datasets)>1){
resfiles <- bptry(bplapply( dsnames, BPPARAM=bpp,
FUN=function(dsi){
tryCatch(
.runPipelineF(dsi, datasets[[dsi]], pipelineDef, alt, eg,
output.prefix, saveEndResults=saveEndResults,
skipErrors=skipErrors),
error=function(e){
print(e)
if(debug) print(traceback())
if(!skipErrors) stop(e)
NULL
})
}))
}else{
nthreads <- 1
if(debug) message("Running in debug mode (single thread)")
resfiles <- lapply( dsnames, FUN=function(dsi){
.runPipelineF(dsi, datasets[[dsi]], pipelineDef, alt, eg, output.prefix,
debug, saveEndResults=saveEndResults, skipErrors=skipErrors)
})
}
message("
Finished running on all datasets")
# save pipeline and resolved functions
pipinfo <- list( pipDef=pipelineDef,
alts=lapply(alt, FUN=function(x){
if(is.numeric(x)) return(x)
lapply(x,FUN=function(x){
tryCatch({
if(is.function(x)) return(x)
if(exists(x) && is.function(get(x))){
return(get(x))
}else{
return(x)
}
},
error=function(e) return(x)
)
})
}),
sessionInfo=sessionInfo(),
call=mcall
)
saveRDS(pipinfo, file=paste0(output.prefix,"runPipelineInfo.rds"))
names(resfiles) <- names(datasets)
res <- lapply(resfiles, readRDS)
errs <- lapply(res, FUN=function(x) x$errors)
if(length(unlist(errs))>0){
errs <- errs[!vapply(errs, FUN.VALUE=logical(1L), FUN=is.null)]
errs <- lapply(errs, .removeRedundantErrors)
errs <- data.frame(dataset=rep(names(errs), lengths(errs)),
step=unlist(errs))
row.names(errs) <- NULL
message("
Some errors were encountered during the run:")
print(errs)
patt <- paste(paste0("^",gsub("([.()\\^{}+$*?]|\\[|\\])", "\\\\\\1",
unique(errs$step))), collapse="|")
message("All results were saved, but only those of analyses successful ",
"across all datasets will be retained for aggregation.")
removeErrorSteps <- function(x){
w <- grep(patt,names(x),invert=TRUE)
if(is.data.frame(x)) return(x[w,,drop=FALSE])
x[w]
}
res <- lapply(res, FUN=function(x){
x$elapsed$stepwise <- lapply(x$elapsed$stepwise, removeErrorSteps)
x$elapsed$total <- removeErrorSteps(x$elapsed$total)
x$evaluation <- lapply(x$evaluation, removeErrorSteps)
x
})
}
message("
Aggregating results...")
res <- aggregatePipelineResults(res, pipelineDef)
saveRDS(res, file=paste0(output.prefix,"aggregated.rds"))
res
}
.runPipelineF <- function( dsi, ds, pipelineDef, alt, eg, output.prefix,
debug=FALSE, saveEndResults=FALSE, noWrite=FALSE,
skipErrors=FALSE ){
if(debug) message(dsi)
pipDef <- stepFn(pipelineDef, type="functions")
args <- arguments(pipelineDef)
ds <- tryCatch( stepFn(pipelineDef, type="initiation")(ds),
error=function(e){
stop("Error trying to initiate dataset ", dsi,"
",e)
})
elapsed <- lapply(pipDef, FUN=function(x) list())
elapsed.total <- list()
objects <- c(list(BASE=ds),lapply(args[-length(args)],FUN=function(x)NULL))
intermediate_return_objects <- lapply(args, FUN=function(x) list() )
run.errors <- c()
rm(ds)
res <- vector(mode="list", length=nrow(eg))
for(n in seq_len(nrow(eg))){
newPar <- as.numeric(eg[n,])
aa <- paste( mapply(an=names(alt), a=alt, i=newPar,
FUN=function(an,a,i) paste0(an,"=",a[i]) )
,collapse=", ")
if(debug){
message("
####################
# Iteration ",n,"
# ", aa)
}
if(n==1){
oldPar <- rep(0,ncol(eg))
}else{
oldPar <- as.numeric(eg[n-1,])
}
# identify the first parameters that changes and the corresponding step
chParam <- names(alt)[which(newPar!=oldPar)[1]]
wStep <- which(vapply(args,FUN=function(x){ chParam %in% x },logical(1)))
# fetch the object from the previous step)
while( (w <- which(names(objects)==names(args)[wStep])) &&
(all(is.na(objects[[w-1]])) || is.null(objects[[w-1]])) && wStep > 2)
wStep <- wStep-1 # to handle steps without parameter
x <- objects[[w-1]]
if(is.null(x)){
# going back to original dataset
x <- objects[[1]]
wStep <- 1
}
# proceed with the remaining steps of the pipeline
for(step in names(args)[seq.int(from=wStep, to=length(args))]){
if(debug) message(step)
# prepare the arguments
a <- lapply(args[[step]], FUN=function(a){
alt[[a]][newPar[which(colnames(eg)==a)]]
})
names(a) <- args[[step]]
#a$x <- x
#x <- do.call(pipDef[[step]], a) ## unknown issue with do.call...
fcall <- .mycall(pipDef[[step]], a)
if(debug) message(fcall)
st <- Sys.time()
if(length(x)>1 || !is.na(x))
x <- tryCatch( eval(fcall),
error=function(e){
.pipError(x, e, step, pipDef, fcall, newPar,
output.prefix, dsi, aa, debug,
continue=skipErrors)
})
# name the current results on the basis of the previous steps:
ws <- seq_len(sum( vapply(args[seq_len(which(names(args)==step))],
length, integer(1)) ))
ename <- .args2name(newPar[ws], alt[ws])
# save elapsed time for this step
elapsed[[step]][[ename]] <- as.numeric(Sys.time()-st)
if(is.null(x) || (length(x)==1 && is.na(x))){
run.errors <- c(run.errors, ename)
x <- NA
}else{
# save eventual intermediate objects (e.g. step evaluation)
if(is.list(x) && all(c("x","intermediate_return") %in% names(x))){
intermediate_return_objects[[step]][[ename]] <- x$intermediate_return
x <- x$x
}else{
if(!is.null(stepFn(pipelineDef, step, "evaluation"))){
intermediate_return_objects[[step]][[ename]] <- tryCatch(
stepFn(pipelineDef, step, "evaluation")(x),
error=function(e){
fcall <- 'stepFn(pipelineDef, step, "evaluation")(x)'
.pipError(x, e, step, pipDef, fcall, newPar, output.prefix, dsi,
aa, debug, continue=skipErrors)
})
}
}
}
objects[[step]] <- x
}
# compute total time for this iteration
elapsed.total[[n]] <- sum(vapply(names(args),FUN=function(step){
ws <- seq_len(sum( vapply(args[seq_len(which(names(args)==step))],
length, integer(1)) ))
ename <- .args2name(newPar[ws], alt[ws])
elapsed[[step]][[ename]]
}, numeric(1) ))
# return final results
res[[n]] <- x
}
if(debug) message("
Completed running all variations.")
# set names as the combination of arguments
names(res) <- apply(eg,1,alt=alt,FUN=.args2name)
names(elapsed.total) <- names(res)
if(noWrite){
out <- SimpleList( evaluation=intermediate_return_objects,
elapsed=list( stepwise=elapsed, total=elapsed.total ) )
if(saveEndResults) out$res <- res
return(out)
}
if(saveEndResults)
saveRDS(res, file=paste0(output.prefix,"res.",dsi,".endOutputs.rds"))
res <- SimpleList( evaluation=intermediate_return_objects,
elapsed=list( stepwise=elapsed, total=elapsed.total ) )
if(length(run.errors)>0) res$errors <- run.errors
metadata(res)$PipelineDefinition <- pipelineDef
ifile <- paste0(output.prefix,"res.",dsi,".evaluation.rds")
saveRDS(res, file=ifile)
return(ifile)
}
# build function call (for a step of runPipeline) from list of arguments
.mycall <- function(fn, args){
if(is.function(fn)) fn <- deparse(match.call()$fn)
args <- paste(paste(names(args), vapply(args, FUN=function(x){
if(is.numeric(x)) return(as.character(x))
paste0("\"",x,"\"")
}, character(1)), sep="="), collapse=", ")
if(args!="") args <- paste(",",args)
parse(text=paste0(fn, "(x=x", args, ")"))
}
.checkPipArgs <- function(alternatives, pipDef=NULL){
if(any(grepl(";|=",names(alternatives))))
stop("Some of the pipeline arguments contain unaccepted characters ",
"(e.g. ';' or '=').")
if(any(vapply(alternatives, function(x) any(grepl(";|=",x)), logical(1))))
stop("Some of the alternative argument values contain unaccepted ",
"characters (e.g. ';' or '=').")
if(!is.null(pipDef)){
def <- defaultArguments(pipDef)
for(f in names(alternatives)) def[[f]] <- alternatives[[f]]
args <- arguments(pipDef)
if(!all(unlist(args) %in% names(def))){
missingParams <- setdiff(as.character(unlist(args)), names(def))
stop("`alternatives` should have entries for the following slots defined",
" in the pipeline: ", paste(missingParams ,collapse=", "))
}
if(!all( vapply(def, length, integer(1))>0)){
stop("All steps of `alternatives` should contain at least one option.")
}
alternatives <- def
}
alternatives
}
.args2name <- function(x, alt){
x2 <- mapply(a=alt,i=as.numeric(x),FUN=function(a,i) a[i])
paste( paste0( names(alt), "=", x2), collapse=";" )
}
.pipError <- function(x, e, step, pipDef, fcall, newPar, output.prefix, dsi, aa,
debug=TRUE, continue=FALSE){
if(debug) save(x, step, pipDef, fcall, newPar,
file=paste0(output.prefix,"runPipeline_error_TMPdump.RData"))
msg <- paste0("Error in dataset `", dsi, "` with parameters:\n", aa,
"\nin step `", step, "`, evaluating command:\n`",
gsub("[[step]]",paste0('[["',step,'"]]'), fcall, fixed=TRUE), "`
Error:\n", e, "\n",
ifelse(debug, paste0("Current variables dumped in ",
output.prefix,
"runPipeline_error_TMPdump.RData"), ""))
if(continue){
cat( msg )
}else{
stop( msg )
}
NULL
}
.splitEG <- function(datasets, eg, nthreads, tol=1){
# we find at which step to split to maximize cluster use
eg <- as.data.frame(eg)
for(i in seq_len(ncol(eg))){
if(length(unique(eg[,i]))*length(datasets) >= (nthreads-tol)){
return(split(eg, eg[,seq_len(i)]))
}
}
split(eg, seq_len(nrow(eg)))
}
.removeRedundantErrors <- function(e){
i <- 1
while(i < length(e)){
w <- i+grep(paste0("^",e[i]), e[-seq_len(i)])
if(length(w)>0) e <- e[-w]
i <- i + 1
}
e
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.