inst/htslib_version/R/ProcessBAM_hts.R

#' @describeIn processBAM (htslib version) Converts BAM files to COV files 
#'   without running `processBAM`.
#' @param read_pool_size How many reads should SpliceWiz/htslib cache into
#'   memory prior to multi-threaded read processing. Higher numbers result
#'   in higher memory consumption but should be minimally faster.
#'   Default `1000000`
#' @export
BAM2COV_hts <- function(
        bamfiles = "./Unsorted.bam",
        sample_names = "sample1",
        output_path = "./cov_folder",
        n_threads = 1, useOpenMP = TRUE,
        overwrite = FALSE,
        verbose = FALSE,
        read_pool_size = 1000000
) {
    if(compiled_with_htslib() == -1)
        .log("SpliceWiz was not compiled with htslib")
    
    # Check args
    if (length(bamfiles) != length(sample_names)) 
        .log(paste("In BAM2COV_hts(),",
            "Number of BAM files and sample names must be the same"))
    if (length(sample_names) != length(unique(sample_names)))
        .log(paste("In BAM2COV_hts(), some sample names are not unique"))

    if (length(bamfiles) == 0) .log("bamfiles argument must not be empty")
    if (!all(file.exists(bamfiles)))
        .log(paste("In BAM2COV_hts(),", "some BAMs in bamfiles do not exist"))

    if (!dir.exists(dirname(output_path))) .log(paste("In BAM2COV_hts(),",
        dirname(output_path), " - path does not exist"))

    if (!dir.exists(output_path)) dir.create(output_path)

    # Check which output already exists; prevent overwrite
    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!overwrite) {
        already_exist <- (
            file.exists(paste0(s_output, ".cov"))
        )
    } else {
        already_exist <- rep(FALSE, length(bamfiles))
    }

    # Call wrapper
    if (!all(already_exist)) {
        .run_BAM2COV_hts(
            bamfiles = bamfiles[!already_exist],
            output_file_prefixes = s_output[!already_exist],
            max_threads = n_threads, useOpenMP = useOpenMP,
            overwrite = overwrite,
            verbose = verbose, read_pool_size = read_pool_size
        )
    } else {
        .log("BAM2COV has already been run on given BAM files", "message")
    }

    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!all(file.exists(paste0(s_output, ".cov"))))
        .log(paste("Some BAM2COV outputs could not be found.",
            "BAM2COV must have crashed"))
}

#' @describeIn processBAM (htslib version) Processes BAM files. Requires a
#' SpliceWiz reference generated by buildRef()
#' @param read_pool_size How many reads should SpliceWiz/htslib cache into
#'   memory prior to multi-threaded read processing. Higher numbers result
#'   in higher memory consumption but should be minimally faster.
#'   Default `1000000`
#' @export
processBAM_hts <- function(
        bamfiles = "./Unsorted.bam",
        sample_names = "sample1",
        reference_path = "./Reference",
        output_path = "./SpliceWiz_Output",
        n_threads = 1, useOpenMP = TRUE,
        overwrite = FALSE,
        run_featureCounts = FALSE,
        verbose = FALSE,
        skipCOVfiles = FALSE,
        read_pool_size = 1000000
) {
    if(compiled_with_htslib() == -1)
        .log("SpliceWiz was not compiled with htslib")
    
    # Check args
    if (length(bamfiles) != length(sample_names)) .log(paste("In processBAM_hts(),",
        "Number of BAM files and sample names must be the same"))
    if (length(sample_names) != length(unique(sample_names)))
        .log(paste("In processBAM_hts(), some sample names are not unique"))

    if (length(bamfiles) == 0) .log("bamfiles argument must not be empty")
    if (!all(file.exists(bamfiles))) .log(paste("In processBAM_hts(),",
        "some BAMs in bamfiles do not exist"))

    if (!dir.exists(dirname(output_path))) .log(paste("In processBAM_hts(),",
        dirname(output_path), " - path does not exist"))

    if (!dir.exists(output_path)) dir.create(output_path)

    # Check which output already exists; prevent overwrite
    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!overwrite) {
        already_exist <- (
            file.exists(paste0(s_output, ".txt.gz")) &
            file.exists(paste0(s_output, ".cov"))
        )
    } else {
        already_exist <- rep(FALSE, length(bamfiles))
    }

    # Call wrapper
    if (!all(already_exist)) {
        .run_processBAM_hts(
            reference_path = reference_path,
            bamfiles = bamfiles[!already_exist],
            output_files = s_output[!already_exist],
            max_threads = n_threads, useOpenMP = useOpenMP,
            overwrite_SpliceWiz_Output = overwrite,
            verbose = verbose, skipCOVfiles = skipCOVfiles,
            read_pool_size = read_pool_size
        )
    } else {
        .log("processBAM has already been run on given BAM files", "message")
    }

    s_output <- file.path(normalizePath(output_path), sample_names)
    if (!all(file.exists(paste0(s_output, ".txt.gz"))))
        .log(paste("Some processBAM outputs could not be found.",
            "processBAM must have crashed"))

    # Run featureCounts
    if (run_featureCounts) {
        .processBAM_run_featureCounts(
            reference_path, output_path,
            bamfiles, sample_names, n_threads, overwrite
        )
    }
}

# Wrapper to R/C++. Handles whether OpenMP or BiocParallel is used
.run_processBAM_hts <- function(
        reference_path = "./Reference",
        bamfiles = "Unsorted.bam",
        output_files = "./Sample",
        max_threads = max(parallel::detectCores(), 1),
        useOpenMP = TRUE,
        overwrite_SpliceWiz_Output = FALSE,
        verbose = TRUE, skipCOVfiles = FALSE, read_pool_size = 1000000
    ) {
    .validate_reference(reference_path) # Check valid SpliceWiz reference
    s_bam <- normalizePath(bamfiles) # Clean path name for C++
    s_ref <- normalizePath(reference_path) # Clean path name for C++

    # Check args
    .processBAM_validate_args(s_bam, max_threads, output_files)
    ref_file <- file.path(s_ref, "SpliceWiz.ref.gz")

    .log("Running SpliceWiz processBAM", "message")
    n_threads <- floor(max_threads)
    if (Has_OpenMP() > 0 & useOpenMP) {
        max_omp_threads <- Has_OpenMP()
        if(max_omp_threads < n_threads) n_threads <- max_omp_threads
        SpliceWizMain_multi_hts(
            ref_file, s_bam, output_files, n_threads, verbose, 
            skipCOVfiles, read_pool_size
        )
    } else {
        # Use BiocParallel
        n_rounds <- ceiling(length(s_bam) / floor(max_threads))
        n_threads <- ceiling(length(s_bam) / n_rounds)

        BPPARAM_mod <- .validate_threads(n_threads, as_BPPARAM = TRUE)

        row_starts <- seq(1, by = n_threads, length.out = n_rounds)
        for (i in seq_len(n_rounds)) {
            selected_rows_subset <- seq(row_starts[i],
                min(length(s_bam), row_starts[i] + n_threads - 1)
            )
            BiocParallel::bplapply(selected_rows_subset,
                function(i, s_bam, reference_file,
                        output_files, verbose, overwrite) {
                    .processBAM_run_single_hts(s_bam[i], reference_file,
                        output_files[i], verbose, overwrite, skipCOVfiles,
                        read_pool_size)
                },
                s_bam = s_bam,
                reference_file = ref_file,
                output_files = output_files,
                verbose = verbose, skipCOVfiles = skipCOVfiles,
                overwrite = overwrite_SpliceWiz_Output,
                BPPARAM = BPPARAM_mod
            )
        }
    }

}

# BAM2COV wrapper to R/C++. Handles whether OpenMP or BiocParallel is used
.run_BAM2COV_hts <- function(
        bamfiles = "sample.bam",
        output_file_prefixes = "sample",
        max_threads = max(parallel::detectCores(), 1),
        useOpenMP = TRUE,
        overwrite = FALSE,
        verbose = TRUE,
        read_pool_size = 1000000
    ) {
    s_bam <- normalizePath(bamfiles) # Clean path name for C++
    # Check args
    .processBAM_validate_args(s_bam, max_threads, output_file_prefixes)

    .log("Running BAM2COV", "message")
    n_threads <- floor(max_threads)
    if (Has_OpenMP() > 0 & useOpenMP) {
        max_omp_threads <- Has_OpenMP()
        if(max_omp_threads > n_threads) n_threads <- max_omp_threads
        # Simple FOR loop:
        for (i in seq_len(length(s_bam))) {
            .BAM2COV_run_single_hts(s_bam[i], output_file_prefixes[i],
                n_threads, verbose, overwrite, read_pool_size)
        }
    } else {
        # Use BiocParallel
        n_rounds <- ceiling(length(s_bam) / floor(max_threads))
        n_threads <- ceiling(length(s_bam) / n_rounds)

        BPPARAM_mod <- .validate_threads(n_threads, as_BPPARAM = TRUE)

        row_starts <- seq(1, by = n_threads, length.out = n_rounds)
        for (i in seq_len(n_rounds)) {
            selected_rows_subset <- seq(row_starts[i],
                min(length(s_bam), row_starts[i] + n_threads - 1)
            )
            BiocParallel::bplapply(selected_rows_subset,
                function(i, s_bam, output_files, verbose, overwrite) {
                    .BAM2COV_run_single_hts(s_bam[i], output_files[i], 1,
                        verbose, overwrite, read_pool_size)
                },
                s_bam = s_bam,
                output_files = output_file_prefixes,
                verbose = verbose,
                overwrite = overwrite,
                BPPARAM = BPPARAM_mod
            )
        }
    }
}

# Call C++ on a single sample. Used for BiocParallel
.processBAM_run_single_hts <- function(
    bam, ref, out, verbose, overwrite, skipCOVfiles = FALSE, read_pool_size
) {
    file_gz <- paste0(out, ".txt.gz")
    file_cov <- paste0(out, ".cov")
    bam_short <- file.path(basename(dirname(bam)), basename(bam))
    if (overwrite ||
        !(file.exists(file_gz) | file.exists(file_cov))) {
        ret <- SpliceWizMain_hts(bam, ref, out, verbose, 1, skipCOVfiles,
            read_pool_size)
        # Check SpliceWiz returns all files successfully
        if (ret != 0) {
            .log(paste(
                "SpliceWiz processBAM exited with errors"))
        } else if (!file.exists(file_gz)) {
            .log(paste(
                "SpliceWiz processBAM failed to produce", file_gz))
        } else if (!file.exists(file_cov)) {
            .log(paste(
                "SpliceWiz processBAM failed to produce", file_cov))
        } else {
            .log(paste("SpliceWiz processBAM processed", bam_short), "message")
        }
    } else {
        .log(paste("SpliceWiz processBAM output for", bam_short,
            "already exists, skipping..."), "message")
    }
}

# Call C++/BAM2COV on a single sample. Used for BiocParallel
.BAM2COV_run_single_hts <- function(
    bam, out, n_threads, verbose, overwrite, read_pool_size = 1000000
) {
    file_cov <- paste0(out, ".cov")
    bam_short <- file.path(basename(dirname(bam)), basename(bam))
    if (overwrite || !(file.exists(file_cov))) {
        ret <- c_BAM2COV_hts(bam, file_cov, verbose, n_threads, read_pool_size)
        # Check BAM2COV returns all files successfully
        if (ret != 0) {
            .log(paste(
                "BAM2COV exited with errors"))
        } else if (!file.exists(file_cov)) {
            .log(paste(
                "BAM2COV failed to produce", file_cov))
        } else {
            .log(paste("BAM2COV processed", bam_short), "message")
        }
    } else {
        .log(paste(file_cov,
            "already exists, skipping..."), "message")
    }
}
alexchwong/SpliceWiz documentation built on Oct. 15, 2024, 10:12 a.m.