Nothing
#' @title Search, integrate and report targeted features in a multiple spectra
#'
#' @description Integrate all target features in all files defined in the
#' initialised input object and store results. The use of updated ROI and the
#' integration of FIR are controled by the input object slots \code{useUROI} and
#' \code{useFIR}. Files are processed in parallel using
#' \link{peakPantheR_singleFileSearch}; \code{ncores} controls the number of
#' cores used for parallelisation, with \code{ncores=0} corresponding to serial
#' processing. If the processing of a file fails (file does not exist or error
#' during execution) the sample is removed from the outputed object.
#'
#' @param object (peakPantheRAnnotation) Initialised peakPantheRAnnotation
#' object defining the samples to process and compounds to target. The slots
#' \code{useUROI} and \code{useFIR} controls if uROI must be used and FIR
#' integrated if a feature is not found
#' @param ncores (int) Number of cores to use for parallelisation. Default 0 for
#' no parallelisation.
#' @param getAcquTime (bool) If TRUE will extract sample acquisition date-time
#' from the mzML metadata (the additional file access will impact run time)
#' @param resetWorkers (int) If 0, the parallel cluster is only initiated once.
#' If >0 the cluster will be reset (and the memory of each worker freed) once
#' \code{ncores * resetWorkers} files have been processed. Default value is 1,
#' the cluster is reset once \code{ncores} files have been processed. While
#' potentially impacting performance (need to wait until all \code{ncores *
#' resetWorkers} files are processed before restarting the cluster), shutting
#' down the workers processes regularly will ensure the OS can reallocate memory
#' more efficiently. For values >1, ensure sufficient system memory is available
#' @param centroided (bool) use TRUE if the data is centroided, used by
#' \code{\link[MSnbase]{readMSData}} when reading the raw data files
#' @param curveModel (str) specify the peak-shape model to fit,
#' by default \code{skewedGaussian}.
#' Accepted values are \code{skewedGaussian} and \code{emgGaussian}
#' @param verbose (bool) If TRUE message calculation progress, time taken,
#' number of features found (total and matched to targets) and failures
#' @param ... Passes arguments to \code{findTargetFeatures} to alter
#' peak-picking parameters
#'
#' @return a list: \code{list()$result} \emph{(peakPantheRAnnotation)} fully
#' annotated object, \code{list()$failures} \emph{(list)} list of failed samples
#' and error message
#'
#' @examples
#' if(requireNamespace('faahKO')){
#' ## Load data
#' library(faahKO)
#'
#' # 3 files
#' input_spectraPaths <- c(system.file('cdf/KO/ko15.CDF', package = 'faahKO'),
#' system.file('cdf/KO/ko16.CDF', package = 'faahKO'),
#' system.file('cdf/KO/ko18.CDF', package = 'faahKO'))
#'
#' # 4 features
#' input_ROI <- data.frame(matrix(vector(), 4, 8,
#' dimnames=list(c(), c('cpdID', 'cpdName', 'rtMin', 'rt',
#' 'rtMax', 'mzMin', 'mz', 'mzMax'))),
#' stringsAsFactors=FALSE)
#' input_ROI[1,] <- c('ID-1', 'Cpd 1', 3310., 3344.888, 3390., 522.194778,
#' 522.2, 522.205222)
#' input_ROI[2,] <- c('ID-2', 'Cpd 2', 3280., 3385.577, 3440., 496.195038,
#' 496.2, 496.204962)
#' input_ROI[3,] <- c('ID-3', 'Cpd 3', 3420., 3454.435, 3495., 464.195358,
#' 464.2, 464.204642)
#' input_ROI[4,] <- c('ID-4', 'Cpd 4', 3670., 3701.697, 3745., 536.194638,
#' 536.2, 536.205362)
#' input_ROI[,c(3:8)] <- vapply(input_ROI[,c(3:8)], as.numeric,
#' FUN.VALUE=numeric(4))
#'
#' # Initialise object
#' initAnnotation <- peakPantheRAnnotation(spectraPaths=input_spectraPaths,
#' targetFeatTable=input_ROI)
#' # to use updated ROI:
#' # uROIExist=TRUE, useUROI=TRUE, uROI=input_uROI
#' # to use FallBack Integration Regions:
#' # useFIR=TRUE, FIR=input_FIR
#'
#' # Run serially
#' result_parallelAnnotation <- peakPantheR_parallelAnnotation(initAnnotation,
#' ncores=0,
#' getAcquTime=FALSE,
#' verbose=TRUE)
#' # Processing 4 compounds in 3 samples:
#' # uROI:\tFALSE
#' # FIR:\tFALSE
#' # ----- ko15 -----
#' # Polarity can not be extracted from netCDF files, please set manually the
#' # polarity with the 'polarity' method.
#' # Reading data from 4 windows
#' # Data read in: 0.24 secs
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #1
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #3
#' # Found 4/4 features in 0.06 secs
#' # Peak statistics done in: 0.02 secs
#' # Feature search done in: 0.76 secs
#' # ----- ko16 -----
#' # Polarity can not be extracted from netCDF files, please set manually the
#' # polarity with the 'polarity' method.
#' # Reading data from 4 windows
#' # Data read in: 0.24 secs
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #1
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #2
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #3
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #4
#' # Found 4/4 features in 0.08 secs
#' # Peak statistics done in: 0 secs
#' # Feature search done in: 0.71 secs
#' # ----- ko18 -----
#' # Polarity can not be extracted from netCDF files, please set manually the
#' # polarity with the 'polarity' method.
#' # Reading data from 4 windows
#' # Data read in: 0.25 secs
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #1
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #2
#' # Warning: rtMin/rtMax outside of ROI; datapoints cannot be used for
#' # mzMin/mzMax calculation, approximate mz and returning ROI$mzMin and
#' # ROI$mzMax for ROI #4
#' # Found 4/4 features in 0.06 secs
#' # Peak statistics done in: 0 secs
#' # Feature search done in: 0.71 secs
#' # ----------------
#' # Parallel annotation done in: 2.18 secs
#'
#' # No failures
#' result_parallelAnnotation$failures
#'
#' result_parallelAnnotation$annotation
#' # An object of class peakPantheRAnnotation
#' # 4 compounds in 3 samples.
#' # updated ROI do not exist (uROI)
#' # does not use updated ROI (uROI)
#' # does not use fallback integration regions (FIR)
#' # is annotated
#' }
#'
#' @family peakPantheR
#' @family parallelAnnotation
#'
#' @import foreach
#' @import doParallel
#' @import mzR
#'
#' @export
peakPantheR_parallelAnnotation <- function(object, ncores = 0,
getAcquTime = TRUE, resetWorkers = 1, centroided = TRUE,
curveModel='skewedGaussian', verbose=TRUE, ...){
# Check inputs, Initialise variables and outputs
initRes <- parallelAnnotation_init(object, resetWorkers, verbose)
file_paths = initRes$file_paths; target_peak_table=initRes$target_peak_table
input_FIR = initRes$input_FIR; resetWorkersMulti = initRes$resetWorkersMulti
stime <- Sys.time()
# Run singleFileSearch
# (list, each item is the result of a file, errors are passed into the list)
allFilesRes <- parallelAnnotation_runSingleFileSearch(object, file_paths,
target_peak_table, input_FIR, ncores, getAcquTime, resetWorkersMulti,
centroided, curveModel, verbose,...)
# Collect, process and reorder results
res <- parallelAnnotation_process(allFilesRes, object, verbose)
outObject <- res$outObject; fail_table <- res$fail_table
## check validity and exit
validObject(outObject)
etime <- Sys.time()
if (verbose) {
message("----------------")
message("Parallel annotation done in: ",
round(as.double(difftime(etime, stime)), 2), " ",
units(difftime(etime, stime)))
message(" ", dim(fail_table)[1], " failure(s)")
}
return(list(annotation = outObject, failures = fail_table))
}
# ------------------------------------------------------------------------------
## Check input file exist, wrap \code{peakPantheR_singleFileSearch} in a
## try cratch, add a failure status
# @param singleSpectraDataPath (str) path to netCDF or mzML raw data
# file (centroided, \strong{only with the channel of interest}).
# @param targetFeatTable a \code{\link{data.frame}} of compounds to
# target as rows. Columns: \code{cpdID} (str), \code{cpdName} (str),
# \code{rtMin} (float in seconds), \code{rt} (float in seconds, or
# \emph{NA}), \code{rtMax} (float in seconds), \code{mzMin} (float),
# \code{mz} (float or \emph{NA}), \code{mzMax} (float).
# @param FIR (data.frame or NULL) If not NULL, integrate Fallback
# Integration Regions (FIR) when a feature is not found. Compounds as
# row are identical to \code{targetFeatTable}, columns are \code{rtMin}
# (float in seconds), \code{rtMax} (float in seconds), \code{mzMin}
# (float), \code{mzMax} (float).
# @param getAcquTime (bool) If TRUE will extract sample acquisition
# date-time from the mzML metadata (the additional file access will
# impact run time) @param verbose (bool) If TRUE message calculation
# progress, time taken and number of features found (total and matched
# to targets)
# @param ... Passes arguments to \code{findTargetFeatures} to alter
# peak-picking parameters @return a list: \code{list()$TIC} \emph{(int)}
# TIC value, \code{list()$peakTable} \emph{(data.frame)} targeted
# features results (see Details), \code{list()$curveFit} \emph{(list)}
# list of \code{peakPantheR_curveFit} or NA for each ROI,
# \code{list()$acquTime} \emph{(POSIXct or NA)} date-time of sample
# acquisition from mzML metadata, \code{list()$ROIsDataPoint}
# \emph{(list)} a list of \code{data.frame} of raw data points for each
# ROI (retention time 'rt', mass 'mz' and intensity 'int' (as column) of
# each raw data points (as row)). \code{list()$failure} \emph{(named str
# or NULL)} a string detailing the error (named with the
# singleSpectraDataPath) or NA if the processing is successful.
parallelAnnotation_parallelHelper <- function(singleSpectraDataPath,
targetFeatTable, inFIR=NULL, inGetAcquTime=FALSE,centr=TRUE,
curveModel='skewedGaussian', inVerbose=TRUE,...){
# Check input path exist or exit with error message
if (!file.exists(singleSpectraDataPath)) {
if (inVerbose) { message(paste("Error file does not exist: ",
singleSpectraDataPath, sep = "")) }
# add error status
failureMsg <- paste("Error file does not exist: ",
singleSpectraDataPath, sep = "")
names(failureMsg) <- singleSpectraDataPath
# return basic values and failure message
return(list(TIC = as.numeric(NA), peakTable = NULL,
acquTime = as.character(NA), curveFit = NULL,
ROIsDataPoint = NULL, failure = failureMsg)) }
# Run singleFileSearch in try catch
file_name <- tools::file_path_sans_ext(basename(singleSpectraDataPath))
# progress
if (inVerbose) { message(paste("-----", file_name, "-----")) }
# try catch
result <- tryCatch({
# singleFileSearch
tmpResult <- peakPantheR_singleFileSearch(singleSpectraDataPath,
targetFeatTable, peakStatistic = TRUE, plotEICsPath = NA,
getAcquTime = inGetAcquTime, FIR = inFIR, centroided = centr,
curveModel = curveModel, verbose = inVerbose, ...)
# add failure status
failureMsg <- NA
names(failureMsg) <- singleSpectraDataPath
tmpResult$failure <- failureMsg
# last evaluation of Try is returned
return(tmpResult)
}, error = function(err) {
# message error
if (inVerbose) {
message("-----")
message(paste("Error processing file:", file_name))
message(err$message)
message("\n-----") }
# add error status
failureMsg <- err$message
names(failureMsg) <- singleSpectraDataPath
# return basic values and failure message
return(list(TIC = as.numeric(NA), peakTable = NULL,
acquTime = as.character(NA), curveFit = NULL,
ROIsDataPoint = NULL, failure = failureMsg)) })
gc(verbose = FALSE) # try clearing variables
return(result) # return singleFileSearch results with failure status
}
## Check inputs, Initialise variables and outputs
parallelAnnotation_init <- function(object, resetWorkers, verbose) {
# check validity of object
validObject(object)
# check resetWorkers value
if (!is.numeric(resetWorkers)) {
stop("Check input, resetWorkers must be an integer")
}
resetWorkersMulti <- as.integer(resetWorkers)
if (resetWorkersMulti < 0) {
stop("Check input, resetWorkers must be a positive integer")
}
# Initialise parameters from object
use_uROI <- useUROI(object)
use_FIR <- useFIR(object)
file_paths <- filepath(object)
if (use_uROI) {
target_peak_table <- uROI(object)
} else {
target_peak_table <- ROI(object)
}
if (use_FIR) {
input_FIR <- FIR(object)
} else {
input_FIR <- NULL
}
# Output parameters
if (verbose & isAnnotated(object)) {
message("!! Data was already annotated, results will be overwritten !!")
}
if (verbose) {
message("Processing ", nbCompounds(object), " compounds in ",
nbSamples(object), " samples:")
message(" uROI:\t", use_uROI)
message(" FIR:\t", use_FIR)
}
return(list(file_paths=file_paths, target_peak_table=target_peak_table,
input_FIR=input_FIR, resetWorkersMulti=resetWorkersMulti))
}
## Run singleFileSearch
# (list, each item is the result of a file, errors are passed into the list)
parallelAnnotation_runSingleFileSearch <- function(object, file_paths,
target_peak_table, input_FIR, ncores, getAcquTime, resetWorkersMulti,centroided,
curveModel, verbose,...) {
if (ncores != 0) { # Parallel
# Reinitialise the cluster after ncores files
# (reset worker processes, freed memory can be reallocated by the OS)
if (resetWorkersMulti != 0) {
# Init
nFiles <- nbSamples(object)
allFilesRes <- vector("list", nFiles)
nFilesPerClust <- (ncores * resetWorkersMulti)
nClust <- ceiling(nFiles/nFilesPerClust)
if (verbose) { message("Running ", nClust, " clusters of ",
nFilesPerClust, " files over ", ncores, " cores:") }
# in each round start a new cluster and store results
for (iClust in seq(0, nClust - 1, 1)) {
if (verbose) {
message(" starting cluster ", iClust + 1, "/", nClust) }
# init
idxStart <- 1 + iClust * nFilesPerClust
idxEnd <- min((nFilesPerClust + iClust * nFilesPerClust),nFiles)
# to not overshoot the number of files
tmp_file_paths <- file_paths[idxStart:idxEnd]
cl <- parallel::makeCluster(ncores)
doParallel::registerDoParallel(cl)
# Run
allFilesRes[idxStart:idxEnd] <- foreach::foreach(
x = tmp_file_paths, .packages = c("MSnbase", "mzR"),
.inorder=TRUE) %dopar% parallelAnnotation_parallelHelper(x,
target_peak_table,inFIR=input_FIR,inGetAcquTime=getAcquTime,
centr = centroided, curveModel=curveModel,
inVerbose = verbose, ...)
parallel::stopCluster(cl) } # Close
} else { # Single cluster init (workload can be balanced across workers)
cl <- parallel::makeCluster(ncores)
doParallel::registerDoParallel(cl)
allFilesRes <- foreach::foreach( # Run
x = file_paths, .packages = c("MSnbase", "mzR"),
.inorder = TRUE) %dopar% parallelAnnotation_parallelHelper(x,
target_peak_table, inFIR=input_FIR, inGetAcquTime=getAcquTime,
centr=centroided, curveModel=curveModel,
inVerbose = verbose,...)#.errorhandling='pass'
# #.export=c('findTargetFeatures', 'getTargetFeatureStatistic')
parallel::stopCluster(cl) } # Close
} else { # Serial
allFilesRes <- lapply(file_paths,
function(x) parallelAnnotation_parallelHelper(x, target_peak_table,
inFIR=input_FIR, inGetAcquTime=getAcquTime, centr=centroided,
curveModel=curveModel, inVerbose=verbose, ...)) }
return(allFilesRes) }
## Collect, process and reorder results
parallelAnnotation_process <- function(allFilesRes, object, verbose) {
# identify annotations that failed
fail_status <- unlist(lapply(allFilesRes,
function(x) {x$failure}), use.names = TRUE)
failures <- !is.na(fail_status)
names(failures) <- NULL
fail_table <- data.frame(matrix(c(names(fail_status)[failures],
fail_status[failures]), ncol = 2, byrow = FALSE,
dimnames = list(c(), c("file", "error"))), stringsAsFactors = FALSE)
# message failures
if ((sum(failures) != 0) & verbose) {
message("----------------")
message(paste(sum(failures), "file(s) failed to process:\n",
paste0(capture.output(fail_table), collapse = "\n"))) }
# remove failures
allFilesRes <- allFilesRes[!failures]
# reshape the output object to match (remove failed samples)
outObject <- object[!failures, ]
# unlist result into final object (if there is a minimum of 1 file left)
if (sum(!failures) > 0) {
# acquisitionTime
outObject@acquisitionTime <- vapply(allFilesRes, function(x) {
as.character(x$acquTime)}, FUN.VALUE = character(1))
# TIC
outObject@TIC <- vapply(allFilesRes, function(x) {
x$TIC}, FUN.VALUE = numeric(1))
# peakTables (all columns but cpdID and cpdName)
outObject@peakTables <- lapply(allFilesRes,
function(x) {x$peakTable[, !names(x$peakTable) %in%
c("cpdID", "cpdName")] })
# dataPoints
outObject@dataPoints <- lapply(allFilesRes,function(x){x$ROIsDataPoint})
# peakFit
outObject@peakFit <- lapply(allFilesRes, function(x) { x$curveFit })
# isAnnotated
outObject@isAnnotated <- TRUE
} else { # All files failed
if (verbose) { message("No file left in the object!") }
}
# reorder results by acquisition date if available
if (sum(is.na(acquisitionTime(outObject))) == 0) {
if (verbose) {
message("Annotation object reordered by sample acquisition date") }
outObject <- outObject[order(acquisitionTime(outObject)), ]
} else {
if (verbose) {
message(paste0('Annotation object cannot be reordered by sample ',
'acquisition date')) }
}
return(list(outObject=outObject, fail_table=fail_table)) }
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.