# normalizeQuantilesPara.R
# Parallelization of the normalize.AffyBatch.quantiles function
# History
# 18.12.2008 : ... old stuff removed ...
# 22.02.2008 : Version 0.15 - modularization
# 26.02.2008 : Version 0.16 - error with ties fixed (affy 1.14 -> affy 1.16.2)
# 27.03.2008 : Version 0.17 - object.type as input removed
# 16.05.2008 : Version 0.18 - one node bug fix and code cleaning (tmp affyBatch removed)
# 18.12.2008 : Version 0.19 - cluster object gets default parameter: .affyParaInternalEnv$cl
# 23.03.2009 : Version 0.20 - Option verbose set to getOption("verbose") and added . to names of internatl functions
# 10.08.2009 : Version 0.21 - fix in computation time output
# 08.03.2010 : Version 0.22 - gsub warning (extend=T) fixed
# Sending AffyBatch form master to slave an back is very time consuming. Sending a list
# of CEL files from master to slave, creating the AffyBatch and do normalization is faster.
# Using the right combination "size of AffyBatch on slaves" - "number of slaves" the parallelized
# version is more than ten times faster as the serial version.
# Copyright (C) 2009 : Markus Schmidberger <>
normalizeAffyBatchQuantilesPara <- function(object,
phenoData = new("AnnotatedDataFrame"), cdfname = NULL,
cluster, verbose=getOption("verbose"))
# Checks
#Check for affy amd snow
#Get cluster object form default environment
cluster <- .affyParaInternalEnv$cl
#Check cluster and generate
checkCluster(cluster) <- length(cluster)
#Check arguments
type <- match.arg(type)
#Check object type
object.type <- .getObjectType(object)
#Check size of partitions
parts <- .checkPartSize(object, <- parts$
object.length <- parts$object.length
#Partition of object
if (verbose) cat("Partition of object ")
t0 <- proc.time();
if (object.type == "AffyBatch"){
object.list <- splitAffyBatch(object,
samples.names <- sampleNames(object)
} else if( object.type == "CELfileVec" ){
object.list <- splitFileVector(object,
#samples.names <- gsub("^/?([^/]*/)*", "", unlist(object), extended = TRUE) #M.S. 8.3.2010 no more required
samples.names <- gsub("^/?([^/]*/)*", "", unlist(object))
} else if( object.type == "partCELfileList" ){
object.list <- object
object <- unlist(object)
object.length <- length(object)
#samples.names <- gsub("^/?([^/]*/)*", "", unlist(object), extended = TRUE) #M.S. 8.3.2010 no more required
samples.names <- gsub("^/?([^/]*/)*", "", unlist(object))
t1 <- proc.time();
if (verbose) cat(round(t1[3]-t0[3],3),"sec DONE\n")
#Info-Output for Distribution
if (verbose) cat("Object Distribution:", unlist(lapply(object.list,length)), "\n")
#Initialize AffyBatches at slaves
if (verbose) cat("Initialize AffyBatches at slaves ")
t0 <- proc.time();
check <- clusterApply(cluster, object.list, .initAffyBatchSF, object.type)
t1 <- proc.time();
if (verbose) cat(round(t1[3]-t0[3],3),"sec DONE\n")
#Normalization depending on types
if (type == "pmonly"){
if (verbose) cat("PM normalization ")
if(type == "mmonly"){
if (verbose) cat("MM normalization ")
if (type == "together"){
if (verbose) cat("PM and MM normalization ")
if(type == "separate"){
if (verbose) cat("PM and MM separate normalization ")
t0 <- proc.time();
normalizeQuantilesPara(cluster, type, object.length)
t1 <- proc.time();
if (verbose) cat(round(t1[3]-t0[3],3),"sec DONE\n")
#Combine / Rebuild affyBatches
if (verbose) cat("Rebuild AffyBatch ")
t0 <- proc.time();
AffyBatch.list.norm <- clusterCall(cluster, .getAffyBatchSF)
AffyBatch <- mergeAffyBatches(AffyBatch.list.norm)
t1 <- proc.time();
if (verbose) cat(round(t1[3]-t0[3],3),"sec DONE\n")
#Return AffyBatch
# Normalization Function for normalizeAffyBatchQuantilesPara
normalizeQuantilesPara <- function(cluster,
type, object.length,
#Seperation for type separate
separate = FALSE
if (type == "separate"){
if (verbose) cat("\tPM normalization\n")
type = "pmonly"
separate = TRUE
#Calculate rowMeans at parts
if (verbose) cat("\tCalculate row means ")
t0 <- proc.time()
row_m <- clusterCall(cluster, normalizeQuantilesParaSF1, type)
# remove list with length ==1 bzw. NAs
row_m <- row_m[!unlist(lapply(row_m,length)==1)]
#Calculate rowMean for all Parts
rowMat <-, lapply(row_m, get("/"), object.length) )
row_mean <- rowSums(rowMat)
t1 <- proc.time()
if (verbose) cat(paste(round(t1[3]-t0[3],3),"sec DONE\n"))
if (verbose) cat("\tQuantil Normalization ")
t0 <- proc.time()
check <- clusterCall(cluster, normalizeQuantilesParaSF2, row_mean)
t1 <- proc.time()
if (verbose) cat(paste(round(t1[3]-t0[3],3),"sec DONE\n"))
#Second part for type separate
if (separate==TRUE){
type = "mmonly"
if (verbose) cat("\tMM normalization\n")
#Calculate rowMeans at parts
if (verbose) cat("\tCalculate row means ")
t0 <- proc.time()
row_m <- clusterCall(cluster, normalizeQuantilesParaSF1, type)
# remove list with length ==1 bzw. NAs
row_m <- row_m[!unlist(lapply(row_m,length)==1)]
#Calculate rowMean for all Parts
rowMat <-, lapply(row_m, get("/"), object.length) )
row_mean <- rowSums(rowMat)
t1 <- proc.time()
if (verbose) cat(paste(round(t1[3]-t0[3],3),"sec DONE\n"))
if (verbose) cat("\tQuantil Normalization ")
t0 <- proc.time()
check <- clusterCall(cluster, normalizeQuantilesParaSF2, row_mean)
t1 <- proc.time()
if (verbose) cat(paste(round(t1[3]-t0[3],3),"sec DONE\n"))
# normalizeQuantilesPara - Slavefunction 1
# calculates pratial rowMeans
normalizeQuantilesParaSF1 <- function(type)
if (exists("AffyBatch", envir = .GlobalEnv)) {
#get AffyBatch and Intensity-Matirx
AffyBatch <- get("AffyBatch", envir = .GlobalEnv)
x <- intensity(AffyBatch)[,,drop=FALSE ]
# Seperation by types
if (type == "pmonly"){
pms <- unlist(pmindex(AffyBatch))
if (dim(x)[2]==1){
noNA <-[pms,]) == 0
} else {
noNA <- rowSums([pms,])) == 0
rows <- pms[noNA]
if(type == "mmonly"){
mms <- unlist(mmindex(AffyBatch))
if (dim(x)[2]==1){
noNA <-[mms,]) == 0
} else {
noNA <- rowSums([mms,])) == 0
rows <- mms[noNA]
if (type == "together"){
rows <- unlist(indexProbes(AffyBatch,"both"))
if(type == "separate"){
#Nothing to calculate
# Temporary save rows at slaves
assign("rows", value=rows, envir= .GlobalEnv)
#Sort Matrix and return rowMeans
cols <- dim(x)[2]
if ( cols == 1 ){
# no rowMeans to calculate
return( sort(x[rows])*cols )
} else if ( cols > 1 ){
xsort <- apply(x[rows,], 2, sort)
return( rowMeans(xsort)*cols )
} else
# normalizeQuantilesPara - Slavefunction 2
# normalize intensity Matrix
normalizeQuantilesParaSF2 <- function(row_mean)
if (exists("AffyBatch", envir = .GlobalEnv) && exists("rows", envir = .GlobalEnv)) {
#Get AffyBatch, IntensityMatrix and rows
AffyBatch <- get("AffyBatch", envir = .GlobalEnv)
x <- intensity(AffyBatch)[,,drop=FALSE ]
rows <- get("rows", envir = .GlobalEnv)
cols <- dim(x)[2]
# Normalization
if ( cols==1 ){
xrank <- rank(x[rows])
fix <- ( xrank-floor(xrank) > 0.4 )
x[rows[fix]] <- 0.5 * (row_mean[floor(xrank[fix])] + row_mean[floor(xrank[fix])+1])
x[rows[!fix]] <- row_mean[floor(xrank[!fix])]
intensity(AffyBatch) <- x
} else if ( cols>1 ){
xrank <- apply(x[rows,],2,rank)
for (j in 1:cols){
fix <- ( xrank[,j]-floor(xrank[,j]) > 0.4 )
x[rows[fix],j] <- 0.5 * (row_mean[floor(xrank[fix,j])] + row_mean[floor(xrank[fix,j])+1])
x[rows[!fix],j] <- row_mean[floor(xrank[!fix,j])]
intensity(AffyBatch) <- x
} else {
assign("AffyBatch", value=AffyBatch, envir= .GlobalEnv)
} else
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.