R/sFinit.R

Defines functions searchCommandline sfSetMaxCPUs sfSession sfSocketHosts sfType sfNodes sfCpus sfGetCluster sfRestore sfParallel sfStop sfIsRunning sfCheck sfInit

## Functions copied from package snowfall (http://cran.r-project.org/web/packages/snowfall/index.html)
## as the package was removed from the CRAN repository.

##*****************************************************************************
## Function for initialisizing the Cluster.
##
## Also the predefinition of (internal) global variables is done here,
## mainly because of the code check of R.
##
## Compability issue: cn.farms needs to know if sfCluster is working (also
## old versions of sfCluster).
## --session could be set by other solutions.
## So, --lockfile is decided to be the sfCluster indicator, as this most
## likely will have no use if not used with sfCluster.
## Therefore setting of --lockfile (LOCKFILE) can cause troubles.
##*****************************************************************************

## These variables are used by internal function. Need to declared for
## the compiler warnings. As these cannot be altered directly, setOption() does
## this (in the namespace). So no global objects are set, internal state is
## kept inside the namespace.
DEBUG          <- FALSE     ## Static switch for debugging messages
.sfOption      <- list()    ## Configuration of this master
.sfPresetCPUs  <- 0         ## Presetted CPU amount (max. allocatable).

## Some vars needed at specific points, which can handled correctly otherwise,
## but would raise R CMD check warnings if not defined before.
.sfPars        <- ''        ## Tmp. var for sfLibrary.
.sfLoadError   <- ''        ## Tmp. var for loading.
.sfTestVar5    <- 0         ## Exporting test in sfTest().

##*****************************************************************************
## Function for initialisizing the Cluster.
##
## Attention: this package does nasty things with explicit sideeffects (not
## only using "require" and "options" etc.)...
##
## "Nodes" and "CPUs" are used identical (unbeautiful).
##
## PARAMETER:  [Boolean parallel - overwrite CMDline settings],
##             [Int nodes        - overwrites commandline settings / DEPRECATED]
##             [Int cpus         - overwrites commandline settings]
##             [Boolean nostart  - If set, no cluster start will be run.
##                                 Needed for nested usage of cn.farms]
##             [Boolean restore  - Globally set restore]
##             [String type      - {'MPI','SOCK', 'PVM', 'NWS'}
##             [Vector socketHosts - List of all hosts used in socketmode]
##             [slaveOutfile     - filename for output on slaves]
##             [useRscript       - Startup via R Script or shellscript. Only snow>0.3]
## RETURN:     Boolean TRUE
##*****************************************************************************
sfInit <- function( parallel=NULL,
    cpus=NULL,
    type=NULL,
    socketHosts=NULL,
    restore=NULL,
    slaveOutfile=NULL,
    nostart=FALSE,
    useRscript=FALSE      ## snow: Default is TRUE.
) {
  
  ## Flag for detection of reconnect (means: non-first calls to sfInit())
  reconnect <- FALSE
  
  ## Saves users from many own if-clauses probably.
  if( nostart ) return( TRUE );

  ## Are options setted?
  if( length( .sfOption ) == 0 ) {
    debug( "Setup sfOption..." )
    
    ## Add 1.62: list from sysdata cleared and created again
    setOption( "parallel", FALSE )
    setOption( "session", NULL )
    setOption( "priority", 1 )
    setOption( "nodes", 1 )
    setOption( "stopped", FALSE )
    setOption( "init", FALSE )
    
    ## Load configuration file: delivered with package and changeable by user.
    config <- structure(list(VALUE = structure(c(1L, 1L, 1L, 2L, 1L, 1L), .Label = c("-", 
                    "64"), class = "factor")), .Names = "VALUE", class = "data.frame", row.names = c("SERVER", 
            "PORT", "TMPDIR", "MAXNODES", "RESTDIR", "DUMMY"))
    configM <- as.matrix( t( config ) )
    config  <- as.list( configM )
    names( config ) <- dimnames( configM )[[2]]
    
    ## Node count are limited in cn.farms as well (as it is useable without
    ## sfCluster) and you probably don't want an arbitrary amount of CPUs
    ## requested by a DAU.
    ## If changed preset exists, take this number.
    if( .sfPresetCPUs > 0 )
      setOption( "MAXNODES", .sfPresetCPUs )
    else
      setOption( "MAXNODES", as.numeric( config[["MAXNODES"]] ) )
    
    ## Startup lockfile (only coming from sfCluster and if available
    ## signalling that cn.farms is started through sfCluster).
    ## LOCKFILE can only be set through commandline --lockfile
    setOption( "LOCKFILE", "" )
    
    ## Temporary directory (for logfiles, esp. on the slaves)
    ## Only if set, if not, take default.
    if( as.character( config[["TMPDIR"]] ) != "-" )
      setOption( "TMPDIR", path.expand( as.character( config[["TMPDIR"]] ) ) )
    else {
      ## Default tempdir on Unix systems is R session tempdir
      if( .Platform$OS.type == "unix" )
        setOption( "TMPDIR", file.path( Sys.getenv( "R_SESSION_TMPDIR" ), "sfCluster" ) )
      ## On any non *nix system: take local dir (R_SESSION_TMPDIR unset on Win)
      else
        setOption( "TMPDIR", "" )
    }
    
    ## Addition variables for save/restore (only used in sfClusterApplySR).
    setOption( "RESTOREFILES", NULL )   ## List with restore files (for cleanup)
    setOption( "RESTOREUPDATE", 5 )     ## Updates percent output any 5%
    setOption( "RESTORE", FALSE )       ## Restore previous results?
    setOption( "CURRENT", NULL )        ## Currently executed R-File
    
    ## Default cluster type (unchangeable by config to ensure runnability
    ## of a specific code in any setting).
    setOption( "type", "SOCK" )
    setOption( "sockHosts", NULL )
    
    ## Restore file directory (for saved intermediate results) - not neccessary
    ## under/in TMPDIR.
    ## (As log files prob woul be set in global dir, restore files should be
    ## stored under users home - as they don't contain a session-ID or something
    ## generic unique thing to differ them.
    if( as.character( config[["RESTDIR"]] ) != "-" )
      setOption( "RESTDIR", path.expand( as.character( config[["RESTDIR"]] ) ) )
    else
      setOption( "RESTDIR", file.path( Sys.getenv( "HOME" ), ".sfCluster", "restore" ) )
    
    ## Remove config (as data() writes it as global variable).
    #rm( config, pos=globalenv() )
  }
  ## If .sfOption exists, sfInit() was called before: restart.
  ## (sfCluster should be able to handle this - although slaves are iterated and
  ## slave killing is only done through snow).
  else {
    reconnect <- TRUE
    
    if( .sfOption$stopped && !.sfOption$init )
      debug( "Irregluar init state (error on previous init)..." )
    
    ## If not stopped, but initialised.
    if( !.sfOption$stopped && .sfOption$init ) {
      message( "Explicit sfStop() is missing: stop now." )
      sfStop()
    }
  }
  
  ##**************************************************************************
  ## Values for parallel/session can be in the commandline or the environment.
  ## Function parameters overwrite commandline.
  ##**************************************************************************
  searchCommandline( parallel, cpus=cpus, type=type, socketHosts=socketHosts,
      restore=restore )
  
  if( getOption( 'verbose' ) && !reconnect )
    print( .sfOption )

  ## If given restore-directory does not exist, create it.
  if( !file.exists( .sfOption$RESTDIR ) ) {
#  if( is.null( .sfOption$RESTDIR ) ) {
    ## 1.62: removed
    ##    .sfOption$RESTDIR <<- path.expand( "~/.sfCluster/restore" )
    dirCreateStop( .sfOption$RESTDIR )
  }
  
  ## Running in parallel mode? That means: Cluster setup.
  ## Will be blocked if argument "nostart" is set (for usage of cn.farms
  ## inside of packages).
  if( .sfOption$parallel && !nostart ) {
    ## Internal stopper. Running in parallel mode a session-ID is needed.
    ## For testing purposes can be anything (mainly used for pathnames
    ## of logfiles).
    if( startedWithSfCluster() && is.null( .sfOption$session ) )
      stop( "No session-ID but parallel run with sfCluster (something went wrong here?)..." )
    ## @TODO regenerate session id if missing.
    
    ## If amount of nodes not set via commandline, then it will be 2
    if( is.null( .sfOption$nodes ) || is.na( as.numeric( .sfOption$nodes ) ) )
      setOption( "nodes", 2 )
    else
      setOption( "nodes", as.numeric( .sfOption$nodes ) )
    
    ## Preload required libraries if needed (as an extended error check).
    libList <- list( "PVM"="rpvm", "MPI"="Rmpi", "NWS"="nws", "SOCK"="" )
    
    if( libList[[.sfOption$type]] != "" ) {
      if( !require( libList[[.sfOption$type]], character.only=TRUE ) ) {
        message( paste( "Failed to load required library:", libList[[.sfOption$type]],
                "for parallel mode", .sfOption$type, "\nFallback to sequential execution" ) )
        
        ## Fallback to sequential mode.
        return( sfInit( parallel=FALSE ) )
      }
      else
        message( paste( "Library", libList[[.sfOption$type]], "loaded." ) )
    }
    
#    ## In any parallel mode, load snow if needed.
#    if( !require( parallel ) ) {
#      message( paste( "Failed to load library 'snow' required for parallel mode.\n",
#              "Switching to sequential mode (1 cpu only)!." ) );
#      
#      ## Fallback to sequential mode.
#      return( sfInit( parallel=FALSE ) )
#    }
    
    ## Chg. 1.62
    ## Temporary file for output.
    ## If sfCluster is running (LOCKFILE given): session is taken.
    ## If sfCluster not running but user setted slaveOutfile option: take arg.
    ## Else (default): no slave outfiles (writing to /dev/null|nul).
    if( startedWithSfCluster() ) {
      tmp <- file.path( .sfOption$TMPDIR,
          paste( "rout_", .sfOption$session, sep="" ) )
      
      ## Only create temporary directory once and if needed.
      ## Only needed if running with sfCluster. If user sets it's own
      ## slaveOutfile, he has to ensure himself about existing pathes.
      ## If needed create temporary path. Problem: this is executed only on
      ## master, not on slaves. The clusterstarter needs to manage this.
      if( !reconnect )
        dirCreateStop( .sfOption$TMPDIR )
    }
    else
      tmp <- ifelse( is.null( slaveOutfile ), '/dev/null', slaveOutfile )
    
    ## @TODO Exception handler.
    ## @TODO Timeout on init.
    ## Ebenso: Timeout - das ist extrem h�sslich, wenn das Cluster nicht
    ## korrekt startet und h�ngen bleibt (z.B. wenn zuviele CPUs f�r das
    ## Cluster angefordert werden - was PVM schluckt, macht MPI anscheinend
    ## Kopfzerbrechen).
    suppressWarnings(setDefaultClusterOptions( type = .sfOption$type ))
    suppressWarnings(setDefaultClusterOptions( homogenous = FALSE ))
    
    ## On socket connections the list of hosts needs to be given.
    ## If no is set, use localhost with default R.
    if( .sfOption$type == "SOCK" ) {
      ## No host information given: use localhost with wished CPUs.
      ## Else: host settings overwrite wished CPUs (important for error checks!).
      if( is.null( .sfOption$sockHosts ) || ( length( .sfOption$sockHosts ) == 0 ) )
        setOption( "sockHosts", c( rep( "localhost", .sfOption$nodes ) ) )
      else
        setOption( "nodes", length( .sfOption$sockHosts ) )
      
      setOption( "cluster", try( makeCluster( .sfOption$sockHosts,
                  type = "SOCK",
                  outfile = tmp,
                  homogenous = TRUE
              ) ) )
    }
    # PVM cluster
    else if( .sfOption$type == "PVM" ) {
      setOption( "cluster", try( makeCluster( .sfOption$nodes,
                  outfile = tmp ) ) )
    }
    # Network Spaces
    else if( .sfOption$type == "NWS" ) {
      if( is.null( .sfOption$sockHosts ) || ( length( .sfOption$sockHosts ) == 0 ) )
        setOption( "sockHosts", c( rep( "localhost", .sfOption$nodes ) ) )
      else
        setOption( "nodes", length( .sfOption$sockHosts ) )
      
      ## Patch Markus Schmidberger (Mail 11/25/2008).
      setOption( "cluster", try( suppressWarnings(makeNWScluster(
                  .sfOption$sockHosts[1:.sfOption$nodes],
                  type = "NWS",
                  outfile = tmp
              ) ) ) )
    }
    # MPI cluster (also default for irregular type).
    else {
      ## 1.81: useRScript must be FALSE. Else sfCluster wont work
      ##       with snow > 0.3 (on older snow Versions this option
      ##       is ignored. Also homogenous is always on.
      ## 1.83: But for non-sfCluster usage at least it has to be modifyable.
      setOption( "cluster", try( suppressWarnings(makeMPIcluster( .sfOption$nodes,
                  outfile = tmp,
                  homogenous = TRUE,
                  useRscript = useRscript
              ) ) ) )
    }
    
    ## Startup successfull? If not: stop.
    if( is.null( .sfOption$cluster ) ||
        inherits( .sfOption$cluster, "try-error" ) )
      stop( paste( "Starting of cluster failed!",
              geterrmessage(), .sfOption$cluster ) )
    
    ## Cluster setup finished. Set flag (used in error handlers and stop).
    ## Also: no function can be called if init is not set.
    setOption( "init", TRUE )
    setOption( "stopped", FALSE )
    
    if( !reconnect ) {
      ## As Snow Init spawn all the requires R-processes, the proprietary
      ## lockfile can be deleted now (if it exists).
      ## Problem: now all R procs are spawned, but the observer most
      ## likely didn't catch them until the next time of his observing
      ## loop.
      if( !is.null( .sfOption$LOCKFILE ) && file.exists( .sfOption$LOCKFILE ) ) {
        if( unlink( .sfOption$LOCKFILE ) != 0 )
          warning( "Unable to remove startup lockfile: ", .sfOption$LOCKFILE )
        else
          message( "Startup Lockfile removed: ", .sfOption$LOCKFILE )
      }
      
      if( getOption( 'verbose' ) ) {
        if( tmp == '/dev/null' )
          message( "Slave output suppressed. Use 'slaveOutfile' to activate." )
        else
          message( paste( "Temporary log for STDOUT/STDERR (on each node): ", tmp, "\n",
                  "Cluster started with", .sfOption$nodes, "CPUs.", "\n" ) )
      }
      else
        debug( paste( "Temporary log for STDOUT/STDERR (on each node): ", tmp, "\n",
                "Cluster started with", .sfOption$nodes, "CPUs.", "\n" ) )
      
      ## Write R-Version and Time in (slave-)logfiles.
      .startInfo <- strsplit( Sys.info(), "\n" );
      .startMsg <- paste( sep="",
          "JOB STARTED AT ", date(),      # Global Var!
          " ON ", .startInfo$nodename, " (OS", .startInfo$sysname,
          ") ", .startInfo$release, "\n" )
      
      sfExport( ".sfOption", ".startMsg", local=TRUE, namespace="cn.farms", debug=DEBUG )
      sfCat( .startMsg, "\n", master=FALSE )    ## No master
      sfCat( paste( "R Version: ", R.version$version.string, "\n\n" ) )
      
      ## Remove starting message.
      sfRemove( ".startMsg" )
    }
    else
      sfExport( ".sfOption", local=FALSE, namespace="cn.farms" )
  }
  ## Sequential mode or option "nostart":
  ## init will be set. If someone calls sfInit with nostart and aims
  ## it to be started, it's his or her problem.
  else {
    ## Cluster setup finished. Set flag (used in error handlers and stop).
    ## Also: no function can be called if init is not set.
    setOption( "init", TRUE )
    setOption( "stopped", FALSE )
    
    setOption( "cluster", NULL )
  }
  
  ## Print init Message (esp. print State of parallel and cn.farms
  ## version.
  if( sfParallel() ) {
    message( paste( "cn.farms ", packageDescription( "cn.farms" )$Version,
            " initialized (using snow ", packageDescription( "snow" )$Version,
            "): parallel execution on ", sfCpus(), " CPUs.\n", sep="" ) );
  }
  else {
    message( paste( "cn.farms", packageDescription( "cn.farms" )$Version,
            "initialized: sequential execution, one CPU.\n" ) );
  }
  
  return( invisible( TRUE ) )
}

##*****************************************************************************
## Check if sfInit() was called.
## This function is called before any function which need initialised cluster.
##
## Previous it stops with error, now it calls sfInit() without parameters,
## so sfInit() does not have to be called explicitely (requested from Harald).
##
## (Not exported to namespace).
##*****************************************************************************
sfCheck <- function() {
  if( !sfIsRunning() ) {
    message( paste( "Calling a cn.farms function without calling 'sfInit'",
            "first or after sfStop().\n'sfInit()' is called now." ) )
    return( invisible( sfInit() ) )
  }
  
  return( invisible( TRUE ) )
}

##*****************************************************************************
## Exported as userfunction.
## Give the user information if sfInit() was called and cluster is not stopped.
## (Maybe helpful inside of packages etc.).
##*****************************************************************************
sfIsRunning <- function() {
  ## Add 1.62: stopped as argument
  if( ( length( .sfOption ) == 0 ) || !.sfOption$init || .sfOption$stopped )
    return( FALSE )
  else
    return( TRUE )
}

##*****************************************************************************
## Stop the (snow)-Cluster. Just calls Snows stopCluster.
##
## PARAMETER: [Boolean nostop: don't stop]
##*****************************************************************************
sfStop <- function( nostop=FALSE ) {
  ## Saves users from many own if-clauses probably.
  if( nostop ) return( TRUE );
  
  if( exists( ".sfOption" )  && ( length( .sfOption ) > 0 ) ) {
    ## Only stop if initialisized and running parallel.
    if( !.sfOption$stopped && .sfOption$init && .sfOption$parallel ) {
      message( "\nStopping cluster\n" )
      
      ## Stopping snow cluster.
      ## NO call to sfGetCluster() here, as sfGetCluster sfCheck()s again.
      stopCluster( .sfOption$cluster )
    }
    
    ## Reset default values.
    ##.sfOption$init     <<- FALSE
    setOption( "stopped", TRUE )
    setOption( "parallel", FALSE )
    
    ## Delete probably stored resultfiles (can also be used in sequential mode!)
    deleteRestoreFiles()
  }
  
  invisible( NULL )
}

##*****************************************************************************
## Is programm running parallel? Wrapper for internal Optionblock (therefore
## exported of course).
## Also: get cluster Handler (prob. not exported in the final).
##
## RETURN: Boolean Running in parallel mode
##*****************************************************************************
sfParallel <- function() {
  sfCheck()
  
  return( .sfOption$parallel )
}

##*****************************************************************************
## Shall sfClusterApplySR restore results?
##*****************************************************************************
sfRestore <- function() {
  sfCheck()
  
  return( .sfOption$RESTORE )
}

##*****************************************************************************
## Receive snow cluster handler (for direct calls to snow functions).
##*****************************************************************************
sfGetCluster <- function() {
  sfCheck()
  
  return( .sfOption$cluster )
}

##*****************************************************************************
## Receive amount of currently used CPUs (sequential: 1).
##*****************************************************************************
sfCpus <- function() {
  sfCheck()
  
  return( .sfOption$nodes )
}

## getter for amount of nodes. Wrapper for sfCPUs.
sfNodes <- function() return( sfCpus() )

##*****************************************************************************
## Receive type of current cluster.
##*****************************************************************************
sfType <- function() {
  sfCheck()
  
  if( sfParallel() )
    return( .sfOption$type )
  else
    return( "- sequential -" )
}

##*****************************************************************************
## Receive list with all socket hosts.
##*****************************************************************************
sfSocketHosts <- function() {
  if( sfType() == "SOCK" ) {
    sfCheck()
    
    return( .sfOption$sockHosts )
  }
  else {
    warning( paste( "No socket cluster used:", sfType() ) )
    
    return( invisible( NULL ) )
  }
}

##*****************************************************************************
## getter for session-ID.
##*****************************************************************************
sfSession <- function() {
  sfCheck();
  
  return( .sfOption$session )
}

##*****************************************************************************
## Increase max. numbers of CPUs used per process.
## No check for sensefull values (if user wants 1000, you get 1000 :)).
##*****************************************************************************
sfSetMaxCPUs <- function( number=32 ) {
  setVar( ".sfPresetCPUs", number )
}

##*****************************************************************************
## Internal function:
##
## Search commandline arguments for Parallel and Session values.
## If there are arguments on function call, these overwrites the values on the
## commandline.
##
## Basically the arguments on the commandline come from sfCluster, but of
## course set manually or via another load- or sessionmanager.
##
## Commandline arguments: --parallel(=[01])*
##                        --session=\d{8}
##                        --nodes=\d{1,2}
##                        --tmpdir=\/[a-z_].*
##                        --hosts=((\s+:\d+))+
##                        --restoreDir=\/[a-z_].*
##                        --restoreSR
##                        --lockfile
## Results will be saved in options .parallel (bool) and .session (8 chars)
##*****************************************************************************
searchCommandline <- function( parallel=NULL, cpus=NULL,
    socketHosts=NULL, type=NULL,
    restore=NULL ) {
#  if( !exists( ".sfOption", envir=globalenv() ) )
#    stop( "Global options missing. Internal error." )
  
  ## If set, copy to sfCluster data structure.
  if( !is.null( cpus ) ) {
    setOption( "nodes", max( 1, cpus ) )
    
    ## For socket/NWS clusters: force rebuild of hostlist (as probably changed).
    ## (If not overwritten later by users own arguments).
    setOption( "sockHosts", NULL )
    
    ## If more than one CPU is wanted, parallel mode is forced.
    ## Probably this is not an intended behavior.
#    if( .sfOption$nodes > 1 ) {
#      ## Potential misuse of argument: inform user.
#      if( !is.null( parallel ) && ( parallel == FALSE ) )
#        warning( "Explicit parallel=FALSE, but required >1 CPUs ==> parallel mode forced." )
#
#      parallel = TRUE
#    }
  }
  
  ## Defaults come from calling arguments on sfInitCluster.
  if( !is.null( parallel ) ) {
    setOption( "parallel", parallel )
    
    if( parallel ) {
      ## There is a slightly problem: as many users can use sfCluster without
      ## session-ID, the session number "XXXXXXXX" is not good enough.
      ## Problem: we need the filename on clusterinit so we cannot use cluster
      ## here.
      ## Win: USERNAME, *nix: LOGNAME
      ## LOGNAME/USER ist not set under Windows (tried Win Server 2003)
      uname <- ifelse( Sys.getenv( "LOGNAME" ) != "", Sys.getenv( "LOGNAME" ),
          Sys.getenv( "USERNAME" ) )
      
      if( uname == "" )
        uname <- "___"
      
      ## Add R for RunSnowMode heterogenous mode.
      ## XXX Check R version and fill in correct version.
      setOption( "session", paste( sep="_",
              "XXXXXXXXR",
              uname,
              format( Sys.time(), "%H%M%S_%m%d%y" ) ) )
      
      ##      message( "Forced parallel. Using session: ", .sfOption$session, " \n" )
    }
    ## Sequential mode: reduce to one CPU.
    else {
      setOption( "nodes", 1 )
      
      ##      message( "Forced to sequential mode.\n" )
    }
  }
  
  ## If socket hosts are set, take them.
  if( !is.null( socketHosts ) || is.vector( socketHosts ) )
    setOption( "sockHosts", socketHosts )
  
  ## Type of the cluster ({SOCK|PVM|MPI|NWS} are allowed).
  if( !is.null( type ) ) {
    if( length( grep( "PVM|MPI|SOCK|NWS", type ) ) > 0 )
      setOption( "type", type )
    else {
      warning( paste( "Unknown cluster type:", type, "Allowed are: {PVM,MPI,SOCK,NWS}. Fallback to SOCKet." ) )
      setOption( "type", "SOCK" )
    }
  }
  ## Default value: socket cluster.
  else
    setOption( "type", "SOCK" )
  
  ## Global restore setting (for sfClusterApplySR).
  if( !is.null( restore ) )
    setOption( "RESTORE", restore )
  
  arguments <- commandArgs()
  
  ## Search for currently executed R-file (if there is any). Detected by
  ## argument followed to option "-f" ("R CMD BATCH" adds -f implicitely).
  ## Save filename for options (for save/restore)
  ## @todo Find a better way to detect R-file (is there any?)
  ## Last argument to be ignored (as no follow-up exists).
  if( length( arguments ) >= 2 ) {
    for( entry in seq( 1, length( arguments ) - 1 ) ) {
      if( !is.null( arguments[entry] ) && ( arguments[entry] == '-f' ) ) {
        ## Switch to next entry and check if this is valid.
        entry <- entry + 1;
        
        ## If yes, take it as filename.
        if( !is.null( arguments[entry] ) && ( arguments[entry] != "" ) ) {
          setOption( "CURRENT", arguments[entry] )
          break
        }
      }
    }
  }
  
  ## No R-file given: set to DEFAULT filename (always occurs in interactive
  ## mode).
  if( is.null( .sfOption$CURRENT ) )
    setOption( "CURRENT", "DEFAULT" )
  
  ## Go through all arguments from commandline.
  for( arg in arguments ) {
    ## Non sfCluster-like argument? Skip.
    ## (Only empty argument are '--parallel' and '--restoreSR')
    if( ( length( grep( "=", arg ) ) == 0 ) &&
        !( ( arg == "--parallel" ) || ( arg == "--restoreSR" ) || ( arg == "--restore" ) ) )
      next;
    
    ## Arguments in form "--name=value"
    args <- strsplit( arg, "=" )
    
    ## Marker for parallel execution.
    ## If parallel was set via function arguments, commandline is ignored.
    if( args[[1]][1] == "--parallel" ) {
      if( !is.null( args[[1]][2] ) && !is.na( as.numeric( args[[1]][2] ) ) )
        cmdParallel <- ifelse( ( as.numeric( args[[1]][2] ) > 0 ), TRUE, FALSE )
      ## --parallel is allowed to use without value (means: true).
      else
        cmdParallel <- TRUE
      
      ## Ask here, instead there will be a warning if used with commandline arg
      ## --parallel and sfInit( parallel=TRUE ).
      ## Rise warning if command arguments are overwritten by sfInit() arguments.
      if( is.null( parallel ) )
        setOption( "parallel", cmdParallel )
      else if( parallel != cmdParallel )
        warning( paste( "Commandline argument --parallel",
                "overwritten with sfInit argument parallel=", parallel ) )
    }
    ## Marker for general restore (only used in sfClusterApplySR).
    ## Both --restoreSR/--restore are allowed.
    else if( ( args[[1]][1] == "--restoreSR" ) || ( args[[1]][1] == "--restore" ) ) {
      if( is.null( restore ) )
        setOption( "RESTORE", TRUE )
      else if( !restore )
        warning( "Commandline argument --parallel",
            "overwritten with sfInit argument restore=TRUE" )
    }
    ## Marker for Session-ID.
    else if( args[[1]][1] == "--session" ) {
      ## Session-ID is allways 8 Chars long.
      ## Not anymore since sfCluster >=0.23
      if( !is.null( args[[1]][2] ) ) { ##&& ( nchar( args[[1]][2] ) == 8 ) ) {
        setOption( "session", args[[1]][2] )
      }
      else
        warning( paste( "Empty or irregular Session-ID: '", args[[1]][2], "'\n" ) )
    }
    ## Amount of CPUs (formerly called "nodes", kept for backward
    ## compatibility).
    ## If set via function arguments, commandline is ignored.
    else if( ( args[[1]][1] == "--nodes" ) || ( args[[1]][1] == "--cpus" ) ) {
      nodes <- try( as.numeric( args[[1]][2] ) )
      
      if( !is.null( nodes ) && !is.na( nodes ) ) {
        if( nodes > .sfOption$MAXNODES ) {
          stop( paste( "Too much CPUs allocated:", nodes, "Max.:",
                  .sfOption$MAXNODES,
                  "\n - Call sfSetMaxCPUs() before sfInit() if you need more." ) )
        }
        else
          nodes <- max( 1, nodes )
        
        ## Really set amount of CPUs? Rise overwrite warning if needed.
        if( is.null( cpus ) )
          setOption( "nodes", nodes )
        else if( cpus != nodes )
          warning( paste( "Commandline --cpus=", nodes,
                  " overwritten by sfInit() argument cpus=", cpus, sep="" ) )
      }
      else
        warning( paste( "Empty or irregular nodes amount: '", nodes, "'\n" ) )
    }
    ## Type of the network.
    else if( args[[1]][1] == "--type" ) {
      if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) ) {
        if( length( grep( "PVM|MPI|SOCK|NWS", args[[1]][2] ) ) > 0 ) {
          if( is.null( type ) )
            setOption( "type", args[[1]][2] )
          else if( type != args[[1]][2] )
            warning( paste( "Commandline --type=", args[[1]][2],
                    " overwritten by sfInit() argument type=", type, sep="" ) )
        }
        else {
          warning( paste( "Unknown cluster type on commandline:", args[[1]][2],
                  "Allowed are: {PVM,MPI,SOCK,NWS}" ) )
        }
      }
      else
        warning( "No cluster-type is given as value for argument --type" )
    }
    ## Hosts for socket mode.
    ## Arguments come in format:
    ##   nodename:cpus  ->  On node X are Y cpus used.
    ##   nodename       ->  On node X one cpu is used.
    ## Any entries are comma seperated (no whitespace allowed!):
    ##  node1:3,node2,node3:2
    else if( args[[1]][1] == "--hosts" ) {
      if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) ) {
        cmdHosts <- c()
        
        hosts = unlist( strsplit( args[[1]][2], "," ) )
        
        ## Examine single host
        for( host in hosts ) {
          info <- unlist( strsplit( host, ":" ) )
          
          ## No CPU amount given: assume 1.
          if( is.null( info[2] ) || is.na( info[2] ) )
            info[2] <- 1
          
          offset <- as.integer( info[2] )
          
          if( offset <= 0 )
            offset <- 1
          
          if( !is.numeric( offset ) )
            stop( paste( "NOT NUMERIC: '", offset, "'", sep="" ) )
          
          len <- length( cmdHosts ) + 1
          
          ## Insert Host n-times where n is amount of CPUs
          ## (required for snows argument format).
          cmdHosts[seq(len,len+offset-1)] <- rep( as.character( info[1] ), offset )
        }
        
        if( is.null( socketHosts ) )
          setOption( "sockHosts", cmdHosts )
        else if( paste( cmdHosts, collapse="" ) != paste( socketHosts, collapse="" ) ) {
          warning( paste( "Commandline --hosts=", args[[1]][2],
                  " overwritten by sfInit() argument hosts=", paste( socketHosts, collapse="," ),
                  sep="" ) )
        }
      }
      else
        warning( "No hosts are given as value for --hosts" )
    }
    ## Temporary directory: slave logs.
    else if( args[[1]][1] == "--tmpdir" ) {
      if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) )
        setOption( "TMPDIR", args[[1]][2] )
      else
        warning( "No temporary directory given as value for --tmpdir" )
    }
    ## Restore directory: intermediate results are lawn here.
    else if( args[[1]][1] == "--restdir" ) {
      if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) )
        setOption( "RESTDIR", args[[1]][2] )
      else
        warning( "No restore/result directory given as value for --restdir" )
    }
    ## Startup lock.
    ## Add 1.62:
    ## should only used from sfCluster => is the marker cn.farms is started
    ## though sfCluster!
    else if( args[[1]][1] == "--lockfile" ) {
      if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) )
        setOption( "LOCKFILE", args[[1]][2] )
      else
        warning( "No lockfile given as value for --lockfile" )
    }
    ## Unknown option
    ## Add 1.62
    ## Add 1.72: patch from Michael Siegel for Mac OS X, which sets --gui.
    else if( args[[1]][1] != "--gui" ) {
      warning( paste( "Unknown option on commandline:", args[[1]][1] ) )
    }
  }
  
  invisible( NULL )
}

Try the cn.farms package in your browser

Any scripts or data that you put into this service are public.

cn.farms documentation built on Nov. 8, 2020, 7:59 p.m.