# file: distributeFiles.R
# Function for (hierarchical) distribution of data from the master node to every slave.
# History
# 29.04.2009 : ... old stuff removed ...
# 24.10.2008 : Version 0.19 - Improvements for multiprocessor machines
# 27.10.2008 : Version 0.20 - Output improved to work at multiprocessor machines and clusters
# 18.12.2008 : Version 0.21 - cluster object gets default parameter: .affyParaInternalEnv$cl
# 23.03.2009 : Version 0.22 - Option verbose set to getOption("verbose") and added . to names of internatl functions
# 29.04.2009 : Version 0.23 - error for multiprocessor machines fixed
# Copyright (C) 2009 : Markus Schmidberger <>
distributeFiles <- function( files, to=tempdir(),
protocol=c("R","RCP","SCP"), hierarchicallyDist=FALSE,
master=TRUE, delExistTo=FALSE,
cluster, verbose=getOption("verbose"))
#Check for snow
#Get cluster object form default environment
cluster <- .affyParaInternalEnv$cl
#Check cluster
#check for multiprocessor machine -> no distribution!
nodenames <-unlist(clusterEvalQ(cluster,["nodename"]))
master <-["nodename"]
if( all(master==nodenames) ){
if(verbose) cat("No data distribution: you use a multiprocessor machine!\n")
return( list(to=dirname(files)[1], CELfiles=files ))
return( list(to=dirname(files)[1], CELfiles=basename(files)) )
} else{
protocol <- match.arg(protocol)
#Delete directories
if (delExistTo == TRUE){
if (verbose>0) cat("Remove Directories ")
t1 <- proc.time()
check <- removeDistributedFiles(cluster, path=to)
t2 <- proc.time()
if (verbose>0) cat(round(t2[3]-t1[3],3),"sec DONE\n")
#Create directories
if (verbose>0) cat("Create Directories ")
t1 <- proc.time()
#at master: only if not tempdir
if(to != tempdir() && master==TRUE){
error <- system(paste("mkdir ",to,sep=""))
if ( error != 0) warning("Directory ",to," already exists at master.")
#at slaves
error <- clusterCall(cluster, dir.create, to, showWarnings = TRUE, recursive = TRUE)
check <- lapply(error, function(x){ if(x!=TRUE) warning("Directory ",to," already exists at slaves.")} )
t2 <- proc.time()
if (verbose>0) cat(round(t2[3]-t1[3],3),"sec DONE\n")
#Partition of files
if (verbose) cat("Partition of files ")
t0 <- proc.time();
if ( length(cluster) == 1 ){
filesPart[[1]] <- files
} else
filesPart <- clusterSplit(cluster,files)
t1 <- proc.time();
if (verbose) cat(round(t1[3]-t0[3],3),"sec DONE\n")
#Info-Output for Distribution
if (verbose){ cat("\tFile Distribution: "); cat(paste(lapply(filesPart,length))); cat("\n") }
#Move Files to master
#Copy all files to master node
if (master == TRUE)
if (verbose>0) cat("Move Files to Master ")
t1 <- proc.time()
MasterNode <-["nodename"]
if (protocol == "R") {
for (j in 1:length(files)){
if (verbose>0) cat(".")
data <- readLines(files[j], n=-1)
filename <- basename(files[j])
newFile <- file(paste(to,filename,sep="/"), "w")
} else if (protocol == "RCP" || protocol == "SCP") {
for (j in files){
if (verbose>0) cat(".")
if (protocol == "RCP") {
if(verbose>1) cat(paste("rcp ", j," ",to,"/",sep=""),"\n")
error <- system(paste("rcp ", j," ",to,"/",sep=""))
} else if (protocol == "SCP") {
if(verbose>1) cat(paste("scp ", j," ",to,"/",sep=""),"\n")
error <- system(paste("scp ", j," ",to,"/",sep=""))
t2 <- proc.time()
if (verbose>0) cat(round(t2[3]-t1[3],3),"sec DONE\n")
#Move Parts to slaves
#Copy every Part to one slave
if (verbose>0) cat("Move Parts to Slaves ")
t1 <- proc.time()
#list of nodes
nodes <- clusterEvalQ(cluster,["nodename"])
anzNodes <- length(nodes)
if (verbose>1) print(filesPart)
#Copy parts of data to slaves
if (protocol == "R") {
for( i in 1:length(filesPart)) {
for (j in 1:length(filesPart[[i]])) {
if( length(filesPart[[i]][j])!= 0 && file.exists(filesPart[[i]][j]) ){
if (verbose>0) cat(".")
data <- readLines(filesPart[[i]][j], n=-1)
filename <- basename(filesPart[[i]][j])
error <- clusterCall(cluster[i], .writeLinesSF, data, paste(to,filename,sep="/"))
} else if (protocol == "RCP" || protocol == "SCP") {
for (i in 1:anzNodes){
host <- paste(nodes[[i]],":",to,"/",sep="")
if (verbose>0) cat(".")
for (j in filesPart[[i]]){
if (protocol == "RCP") {
if(verbose>1) cat(paste("rcp ", j," ",host,sep=""),"\n")
error <- system(paste("rcp ", j," ",host,sep=""))
} else if (protocol == "SCP") {
if(verbose>1) cat(paste("scp ", j," ",host,sep=""),"\n")
error <- system(paste("scp ", j," ",host,sep=""))
t2 <- proc.time()
if (verbose>0) cat(round(t2[3]-t1[3],3),"sec DONE\n")
#Hierarchical distribution between slaves
# copy parts from slaves to all other slaves
if (hierarchicallyDist==TRUE){
if (verbose>0) cat("Hierarchical distribution ")
t1 <- proc.time()
if (protocol == "RCP" || protocol == "SCP") {
out <- clusterCall(cluster, hirarchicalDistSF, to, nodes, protocol=protocol)
if (verbose>1) print(out)
} else if (protocol == "R") {
warning("TODO hierarchically dist with R not yet implemented")
t2 <- proc.time()
if (verbose>0) cat(round(t2[3]-t1[3],3),"sec DONE\n")
if (hierarchicallyDist==TRUE){
filesPart <- clusterCall(cluster, list.files, to, full.names=TRUE)
filesSlaves <-lapply(filesPart, function(x,to){
if(length(x)!=0) paste(to,basename(x),sep="/")
filesSlaves <-lapply(filesPart, basename)
return( list(to=to, CELfiles=filesSlaves) )
# Slavefunction
# Hirarchical distribution
hirarchicalDistSF <- function(to,nodes, protocol)
nodename <-["nodename"]
#listing of existing files on slave
files <- list.files(path=to, full.names=TRUE)
# nodes randomication to reduce load at nodes
nodes <- nodes[sample(length(nodes))]
for (i in files){
for (j in nodes){
if (nodename != j){
if (protocol == "RCP") {
out[[i]][j] <- paste("rcp ",i," ",j,":",to,"/",sep="")
system(paste("rcp ",i," ",j,":",to,"/",sep=""))
} else if (protocol == "SCP") {
out[[i]][j] <- paste("scp ",i," ",j,":",to,"/",sep="")
system(paste("scp ",i," ",j,":",to,"/",sep=""))
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.