R/ProcessBAM.R

Defines functions .processBAM_validate_args .processBAM_getStrand .processBAM_fcfile_validate .processBAM_run_featureCounts .BAM2COV_run_single .processBAM_run_single .run_BAM2COV .run_processBAM processBAM BAM2COV

Documented in BAM2COV processBAM

#' @describeIn processBAM Converts BAM files to COV files without running
#'   `processBAM()`
#' @export
BAM2COV <- function(
        bamfiles = "./Unsorted.bam",
        sample_names = "sample1",
        output_path = "./cov_folder",
        n_threads = 1, useOpenMP = TRUE,
        overwrite = FALSE,
        verbose = FALSE,
        multiRead = FALSE
) {
    # Check args
    if (length(bamfiles) != length(sample_names)) 
        .log(paste("In BAM2COV(),",
            "Number of BAM files and sample names must be the same"))
    if (length(sample_names) != length(unique(sample_names)))
        .log(paste("In BAM2COV(), some sample names are not unique"))

    if (length(bamfiles) == 0) .log("bamfiles argument must not be empty")
    if (!all(file.exists(bamfiles)))
        .log(paste("In BAM2COV(),", "some BAMs in bamfiles do not exist"))

    if (!dir.exists(dirname(output_path))) .log(paste("In BAM2COV(),",
        dirname(output_path), " - path does not exist"))

    if (!dir.exists(output_path)) dir.create(output_path)

    # Check which output already exists; prevent overwrite
    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!overwrite) {
        already_exist <- (
            file.exists(paste0(s_output, ".cov"))
        )
    } else {
        already_exist <- rep(FALSE, length(bamfiles))
    }

    # Call wrapper
    if (!all(already_exist)) {
        .run_BAM2COV(
            bamfiles = bamfiles[!already_exist],
            output_file_prefixes = s_output[!already_exist],
            max_threads = n_threads, useOpenMP = useOpenMP,
            overwrite = overwrite,
            verbose = verbose, multiRead = multiRead
        )
    } else {
        .log("BAM2COV has already been run on given BAM files", "message")
    }

    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!all(file.exists(paste0(s_output, ".cov"))))
        .log(paste("Some BAM2COV outputs could not be found.",
            "BAM2COV must have crashed"))
}

#' @describeIn processBAM Processes BAM files. Requires a
#' SpliceWiz reference generated by buildRef()
#' @export
processBAM <- function(
        bamfiles = "./Unsorted.bam",
        sample_names = "sample1",
        reference_path = "./Reference",
        output_path = "./SpliceWiz_Output",
        n_threads = 1, useOpenMP = TRUE,
        overwrite = FALSE,
        run_featureCounts = FALSE,
        verbose = FALSE,
        skipCOVfiles = FALSE,
        multiRead = FALSE
) {
    # Check args
    if (length(bamfiles) != length(sample_names)) .log(paste("In processBAM(),",
        "Number of BAM files and sample names must be the same"))
    if (length(sample_names) != length(unique(sample_names)))
        .log(paste("In processBAM(), some sample names are not unique"))

    if (length(bamfiles) == 0) .log("bamfiles argument must not be empty")
    if (!all(file.exists(bamfiles))) .log(paste("In processBAM(),",
        "some BAMs in bamfiles do not exist"))

    if (!dir.exists(dirname(output_path))) .log(paste("In processBAM(),",
        dirname(output_path), " - path does not exist"))

    if (!dir.exists(output_path)) dir.create(output_path)

    # Check which output already exists; prevent overwrite
    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!overwrite) {
        already_exist <- (
            file.exists(paste0(s_output, ".txt.gz")) &
            file.exists(paste0(s_output, ".cov"))
        )
    } else {
        already_exist <- rep(FALSE, length(bamfiles))
    }

    # Call wrapper
    if (!all(already_exist)) {
        .run_processBAM(
            reference_path = reference_path,
            bamfiles = bamfiles[!already_exist],
            output_files = s_output[!already_exist],
            max_threads = n_threads, useOpenMP = useOpenMP,
            overwrite_SpliceWiz_Output = overwrite,
            verbose = verbose, skipCOVfiles = skipCOVfiles,
            multiRead = multiRead
        )
    } else {
        .log("processBAM has already been run on given BAM files", "message")
    }

    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!all(file.exists(paste0(s_output, ".txt.gz"))))
        .log(paste("Some processBAM outputs could not be found.",
            "processBAM must have crashed"))

    # Run featureCounts
    if (run_featureCounts) {
        .processBAM_run_featureCounts(
            reference_path, output_path,
            bamfiles, sample_names, n_threads, overwrite
        )
    }
}

# Wrapper to R/C++. Handles whether OpenMP or BiocParallel is used
.run_processBAM <- function(
        reference_path = "./Reference",
        bamfiles = "Unsorted.bam",
        output_files = "./Sample",
        max_threads = max(parallel::detectCores(), 1),
        useOpenMP = TRUE,
        overwrite_SpliceWiz_Output = FALSE,
        verbose = TRUE, skipCOVfiles = FALSE, multiRead = FALSE
    ) {
    .validate_reference(reference_path) # Check valid SpliceWiz reference
    s_bam <- normalizePath(bamfiles) # Clean path name for C++
    s_ref <- normalizePath(reference_path) # Clean path name for C++

    # Check args
    .processBAM_validate_args(s_bam, max_threads, output_files)
    ref_file <- file.path(s_ref, "SpliceWiz.ref.gz")

    .log("Running SpliceWiz processBAM", "message")
    n_threads <- floor(max_threads)
    if (Has_OpenMP() > 0 & useOpenMP) {
        max_omp_threads <- Has_OpenMP()
        if(max_omp_threads < n_threads) n_threads <- max_omp_threads
        SpliceWizMain_multi(
            ref_file, s_bam, output_files, n_threads, verbose, 
            skipCOVfiles, multiRead
        )
    } else {
        # Use BiocParallel
        n_rounds <- ceiling(length(s_bam) / floor(max_threads))
        n_threads <- ceiling(length(s_bam) / n_rounds)

        BPPARAM_mod <- .validate_threads(n_threads, as_BPPARAM = TRUE)

        row_starts <- seq(1, by = n_threads, length.out = n_rounds)
        for (i in seq_len(n_rounds)) {
            selected_rows_subset <- seq(row_starts[i],
                min(length(s_bam), row_starts[i] + n_threads - 1)
            )
            BiocParallel::bplapply(selected_rows_subset,
                function(i, s_bam, reference_file,
                        output_files, verbose, overwrite, skipCOVfiles) {
                    .processBAM_run_single(s_bam[i], reference_file,
                        output_files[i], verbose, overwrite, skipCOVfiles)
                },
                s_bam = s_bam,
                reference_file = ref_file,
                output_files = output_files,
                verbose = verbose, skipCOVfiles = skipCOVfiles,
                overwrite = overwrite_SpliceWiz_Output,
                BPPARAM = BPPARAM_mod
            )
        }
    }

}

# BAM2COV wrapper to R/C++. Handles whether OpenMP or BiocParallel is used
.run_BAM2COV <- function(
        bamfiles = "sample.bam",
        output_file_prefixes = "sample",
        max_threads = max(parallel::detectCores(), 1),
        useOpenMP = TRUE,
        overwrite = FALSE,
        verbose = TRUE,
        multiRead = FALSE
    ) {
    s_bam <- normalizePath(bamfiles) # Clean path name for C++
    # Check args
    .processBAM_validate_args(s_bam, max_threads, output_file_prefixes)

    .log("Running BAM2COV", "message")
    n_threads <- floor(max_threads)
    if (Has_OpenMP() > 0 & useOpenMP) {
        max_omp_threads <- Has_OpenMP()
        if(max_omp_threads < n_threads) n_threads <- max_omp_threads
        # Simple FOR loop:
        for (i in seq_len(length(s_bam))) {
            .BAM2COV_run_single(s_bam[i], output_file_prefixes[i],
                n_threads, verbose, overwrite, multiRead)
        }
    } else {
        # Use BiocParallel
        n_rounds <- ceiling(length(s_bam) / floor(max_threads))
        n_threads <- ceiling(length(s_bam) / n_rounds)

        BPPARAM_mod <- .validate_threads(n_threads, as_BPPARAM = TRUE)

        row_starts <- seq(1, by = n_threads, length.out = n_rounds)
        for (i in seq_len(n_rounds)) {
            selected_rows_subset <- seq(row_starts[i],
                min(length(s_bam), row_starts[i] + n_threads - 1)
            )
            BiocParallel::bplapply(selected_rows_subset,
                function(i, s_bam, output_files, verbose, overwrite) {
                    .BAM2COV_run_single(s_bam[i], output_files[i], 1,
                        verbose, overwrite)
                },
                s_bam = s_bam,
                output_files = output_file_prefixes,
                verbose = verbose,
                overwrite = overwrite,
                BPPARAM = BPPARAM_mod
            )
        }
    }
}

# Call C++ on a single sample. Used for BiocParallel
.processBAM_run_single <- function(
    bam, ref, out, verbose, overwrite, skipCOVfiles = FALSE
) {
    file_gz <- paste0(out, ".txt.gz")
    file_cov <- paste0(out, ".cov")
    bam_short <- file.path(basename(dirname(bam)), basename(bam))
    if (overwrite ||
        !(file.exists(file_gz) | file.exists(file_cov))) {
        ret <- SpliceWizMain(bam, ref, out, verbose, 1, skipCOVfiles, FALSE)
        # Check SpliceWiz returns all files successfully
        if (ret != 0) {
            .log(paste(
                "SpliceWiz processBAM exited with errors"))
        } else if (!file.exists(file_gz)) {
            .log(paste(
                "SpliceWiz processBAM failed to produce", file_gz))
        } else if (!file.exists(file_cov)) {
            .log(paste(
                "SpliceWiz processBAM failed to produce", file_cov))
        } else {
            .log(paste("SpliceWiz processBAM processed", bam_short), "message")
        }
    } else {
        .log(paste("SpliceWiz processBAM output for", bam_short,
            "already exists, skipping..."), "message")
    }
}

# Call C++/BAM2COV on a single sample. Used for BiocParallel
.BAM2COV_run_single <- function(
    bam, out, n_threads, verbose, overwrite, multiRead = FALSE
) {
    file_cov <- paste0(out, ".cov")
    bam_short <- file.path(basename(dirname(bam)), basename(bam))
    if (overwrite || !(file.exists(file_cov))) {
        ret <- c_BAM2COV(bam, file_cov, verbose, n_threads, multiRead)
        # Check BAM2COV returns all files successfully
        if (ret != 0) {
            .log(paste(
                "BAM2COV exited with errors"))
        } else if (!file.exists(file_cov)) {
            .log(paste(
                "BAM2COV failed to produce", file_cov))
        } else {
            .log(paste("BAM2COV processed", bam_short), "message")
        }
    } else {
        .log(paste(file_cov,
            "already exists, skipping..."), "message")
    }
}

# Runs featureCounts on given BAM files, intended to be run after processBAM
# as processBAM determines the strandedness and paired-ness of the experiment
.processBAM_run_featureCounts <- function(
        reference_path, output_path,
        s_bam, s_names, n_threads, overwrite
) {
    .check_package_installed("Rsubread", "2.4.0")
    gtf_file <- Get_GTF_file(reference_path)

    expr <- findSpliceWizOutput(output_path)
    if(nrow(expr) == 0) .log(paste(
        "SpliceWiz output files missing from", output_path,
        "- cannot run SpliceWiz's featureCounts wrapper"
    ))
    output_files <- expr$sw_file[match(s_names, expr$sample)]


    # Check which have already been run, do not run if overwrite = FALSE
    outfile <- file.path(output_path, "main.FC.Rds")
    if(overwrite) {
        need_to_do <- rep(TRUE, length(s_names))
        samples_todo <- s_names
    } else {
        samples_todo <-  .processBAM_fcfile_validate(outfile, s_names)
        if(!is.null(samples_todo) && length(samples_todo) == 0) {
            .log(paste(
                "featureCounts already run on all samples, output in",
                outfile
            ), "message")
            return(0)
        }
        if(is.null(samples_todo)) samples_todo <- s_names
        need_to_do <- s_names %in% samples_todo
    }

    # determine paired-ness, strandedness, assume all BAMS are the same
    output_files <- expr$sw_file[match(samples_todo, expr$sample)]
    
    strand <- c()
    paired <- c()
    for(i in seq_len(length(output_files))) {
        ret <- .processBAM_getStrand(output_files[i])
        strand <- c(strand, ret$strand)
        paired <- c(paired, ret$paired)
    }
    if(length(unique(strand)) > 1) {    
        .log(paste(
            "Samples with different stranded-ness found:",
            paste(unique(strand), collapse = ", "),
            ", running featureCounts using un-stranded mode."
        ), "warning")
        strandUse <- 0
    } else {
        strandUse <- unique(strand)
    }
    if(length(unique(paired)) > 1) {    
        .log(paste(
            "Samples with both single / paired reads",
            paste(unique(paired), collapse = ", "),
            ", running featureCounts using single-end reads."
        ), "warning")
        pairedUse <- FALSE
    } else {
        pairedUse <- unique(paired)
    }
    
    # Run FeatureCounts in bulk
    res <- Rsubread::featureCounts(
        s_bam[need_to_do],
        annot.ext = gtf_file,
        isGTFAnnotationFile = TRUE,
        strandSpecific = strandUse,
        isPairedEnd = pairedUse,
        requireBothEndsMapped = pairedUse,
        nthreads = n_threads
    )
    res$targets <- s_names[need_to_do]
    colnames(res$counts) <- s_names[need_to_do]
    colnames(res$stat)[-1] <- s_names[need_to_do]
    columns <- c("counts", "annotation", "targets", "stat")
    if (!all(columns %in% names(res))) 
        .log("Error encountered when running featureCounts")

    # Append to existing main.FC.Rds if exists, overwriting where necessary:
    validFC <- .processBAM_fcfile_validate(outfile, s_names[need_to_do])
    if(!is.null(validFC)) {
        # Valid prior output that needs to be overwritten
        res.old <- readRDS(outfile)
        if (
            identical(res.old$annotation, res$annotation) &
            identical(res.old$stat$Status, res$stat$Status)
        ) {
            if(overwrite) {
                # Remove samples in old output that have been re-run
                removeSamples <- intersect(res.old$targets, res$targets)
                keepSamples = setdiff(res.old$targets, removeSamples)
                if(length(removeSamples) > 0) {
                    res.old$targets <- setdiff(res.old$targets, res$targets)
                    res.old$stat <- res.old$stat[, c("Status", keepSamples),
                        drop = FALSE]
                    res.old$counts <- res.old$counts[, keepSamples,
                        drop = FALSE]
                }
            }
            # Append old sample results to existing results
            new_samples <- res$targets[!(res$targets %in% res.old$targets)]
            
            res$targets <- c(res.old$targets, new_samples)
            res$stat <- cbind(res.old$stat, res$stat[, new_samples])
            res$counts <- cbind(res.old$counts, res$counts[, new_samples])
            rownames(res$counts) <- res$annotation$GeneID
        } else {
            .log(paste(
                "featureCounts output not compatible with previous",
                "output in", outfile, "; overwriting previous output"
            ), "warning")
        }
    }
    
    if (file.exists(outfile) & is.null(validFC)) {
        .log(paste(outfile,
            "found but was not a valid SpliceWiz featureCounts",
            "output; overwriting previous output"
        ), "warning")
    }
    saveRDS(res, outfile)
    .log(paste("featureCounts ran succesfully; saved to",
        outfile), "message")
}

# Validates old fc output
# Returns:
# - NULL if no or invalid output file
# - character(0) if all samples already processed
# - vector of samples that need to be processed
.processBAM_fcfile_validate <- function(outfile, samples) {
    if (!file.exists(outfile)) return(NULL)
    
    columns <- c("counts", "annotation", "targets", "stat")
    res <- readRDS(outfile)
    
    if (!all(columns %in% names(res))) return(NULL)
    need_to_do <- samples[!(samples %in% res[["targets"]])]
    return(need_to_do)
}

.processBAM_getStrand <- function(outfile) {
    data.list <- get_multi_DT_from_gz(
        outfile, c("BAM", "Directionality")
    )
    stats <- data.list$BAM
    direct <- data.list$Directionality
    numSingles <- as.numeric(stats$Value[3])
    numPairs <- as.numeric(stats$Value[4])
    paired <- (numSingles == 0 & numPairs > 0) ||
        (numSingles > 0 && numPairs / numSingles / 1000)
    strand <- as.numeric(direct$Value[9])
    if (strand == -1) strand <- 2
    return(list(
        paired = paired,
        strand = strand
    ))
}

# Validate arguments; return error if invalid
.processBAM_validate_args <- function(s_bam, max_threads, output_files) {
    if (!is.numeric(max_threads)) max_threads <- 1
    if (max_threads < 1) max_threads <- 1
    max_threads <- floor(max_threads)

    if (max_threads > 1 && max_threads > parallel::detectCores()) {
        .log(paste(
            max_threads, " threads is not allowed for this system"))
    }

    if (!all(file.exists(s_bam))) {
        .log(paste(
            paste(unique(s_bam[!file.exists(s_bam)]), collapse = ""),
            " - these BAM files were not found"))
    }

    if (!all(dir.exists(dirname(output_files)))) {
        .log(paste(
            paste(unique(dirname(
                    output_files[!dir.exists(dirname(output_files))])),
                collapse = ""),
            " - directories not found"))
    }

    if (!(length(s_bam) == length(output_files))) {
        .log("Number of output files and bam files must be the same")
    }
    return(TRUE)
}
alexchwong/SpliceWiz documentation built on Oct. 15, 2024, 10:12 a.m.