inst/HiTMaP_GUI/FutureTaskProcessor.R

if (!require(future)) install.packages("future")
library(future)
#FutureTaskProcessor.R https://gist.github.com/PeterVermont/a4a29d2c6b88e4ee012a869dedb5099c#file-futuretaskprocessor-r

#NOTE: the file that 'source's this should also call plan(multiprocess, workers=<desired number of workers>) for example:
#plan(multiprocess, workers=min((myNumTasks+1), MAX_PROCESSES))
#it is not required to specify workers -- if not then it will default to future::availableCores()
#use myNumTasks+1 because future uses one process for itself.

asyncTasksRunning <- list()

startAsyncTask <-
  function(asyncTaskName,
           futureObj,
           callback = NULL,
           debug = FALSE,
           tracklink="...",
           ...) {
    submitTime = Sys.time()
    
    if (futureObj$lazy) {
      warning(
        paste0(
          "startAsyncTask futureObj  has lazy=TRUE! '",
          asyncTaskName,
          "' will not be started until processRunningTasks is called with wait=TRUE and will then only run one item at a time!"
        )
      )
    }
    if (debug)
      print(paste0(
        submitTime,
        ": startAsyncTask asyncTaskName '",
        asyncTaskName,
        "' called. There are now ", length(asyncTasksRunning)+1, " current tasks."
      ))
    
    if (exists(asyncTaskName, asyncTasksRunning)) {
      stop(
        "Error: A task with the same asyncTaskName '",
        asyncTaskName,
        "' is already running. It is not known if it is running the same task"
      )
    }
    asyncTaskObject <- list(
      futureObj = futureObj,
      taskName = asyncTaskName,
      callback = callback,
      submitTime = submitTime,
      tracklink = tracklink
    )
    asyncTasksRunning[[asyncTaskName]] <<- asyncTaskObject
  } #end startAsyncTask

getNumberOfRunningTasks <- function() {
  return(length(asyncTasksRunning))
}
getRunningTasksStatus <- function() {
  getRunningTaskStatus <- function(asyncTaskObject) {
    if (is.null(asyncTaskObject) ||
        length(names(asyncTaskObject)) < 4) {
      runningTaskStatus <- "[NULL]"
    } else {
      runningTaskStatus <-
        paste0(
          "[",
          asyncTaskObject[["taskName"]],
          "'s elapsed time: ",
          format(Sys.time() - asyncTaskObject[["submitTime"]]),
          ", Finished?: ",
          resolved(asyncTaskObject[["futureObj"]]),
          "]"
        )
    }
    return(runningTaskStatus)
  }
  
  getRunningTaskStatus_DT <- function(asyncTaskObject) {
    if (is.null(asyncTaskObject) ||
        length(names(asyncTaskObject)) < 4) {
      runningTaskStatus <-         
        data.frame(
        Task="",
        Submit_time="",
        Elapsed_time="",
        Status<-""
      )[-1,]
    } else {
      runningTaskStatus <-
        data.frame(
          Task=asyncTaskObject[["taskName"]],
          Submit_time=format(asyncTaskObject[["submitTime"]], "%Y %b %d %X"),
          Elapsed_time=format(round(Sys.time() - asyncTaskObject[["submitTime"]],digits = 2)),
          Status=unname(asyncTaskObject[["futureObj"]][["state"]])
        )
    }
    return(runningTaskStatus)
  }
  # runningTasksStatus <-
  #   paste(
  #     Sys.time(),
  #     ": # of running tasks: ",
  #     length(asyncTasksRunning),
  #     paste0(collapse = ", ", lapply(
  #       asyncTasksRunning, getRunningTaskStatu
  #     ))
  #   )
  runningTasksStatus <-
   data.table::as.data.table(do.call(rbind, lapply(
        asyncTasksRunning, getRunningTaskStatus_DT
      ))
    )
  return(runningTasksStatus)
} #end getRunningTasksStatus

#' Meant to called periodically, this will check all running asyncTasks for completion
#' Returns number of remaining tasks so could be used as a boolean
processRunningTasks <-
  function(wait = FALSE,
           catchErrors = TRUE,
           debug = FALSE,
           maximumTasksToResolve = NULL)
  {
    if (!is.null(maximumTasksToResolve) &&
        (maximumTasksToResolve < 1)) {
      stop(
        paste0(
          "processRunningTasks called with maximumTasksToResolve=",
          maximumTasksToResolve,
          " which does not make sense. It must be greater than 0 if specified"
        )
      )
    }
    
    functionStartTime <- Sys.time()
    numTasksResolved <- 0
    for (asyncTaskName in names(asyncTasksRunning)) {
      if (!is.null(maximumTasksToResolve) &&
          (numTasksResolved >= maximumTasksToResolve)) {
        # if (debug)
        #   print(
        #     paste0(
        #       Sys.time(),
        #       ": processRunningTasks: stopping checking for resolved tasks because maximumTasksToResolve (",
        #       maximumTasksToResolve,
        #       ") already resolved."
        #     )
        #   )
        break
      } #end checking if need to break because of maximumTasksToResolve
      asyncTaskObject <- asyncTasksRunning[[asyncTaskName]]
      asyncFutureObject <- asyncTaskObject[["futureObj"]]
      isObjectResolved <- resolved(asyncFutureObject)
      if (isObjectResolved || wait) {
        if (debug && !isObjectResolved) {
          print(
            paste0(
              Sys.time(),
              ": processRunningTasks about to wait for task '",
              asyncTaskName,
              "' to finish. ", length(asyncTasksRunning), " tasks still running."
            )
          )
        }
        taskResult <- NULL
        numTasksResolved <- numTasksResolved + 1
        #NOTE future will send any errors it caught when we ask it for the value -- same as if we had evaluated the expression ourselves
        caughtError <- NULL
        caughtWarning <- NULL
        if (catchErrors) {
          withCallingHandlers(
            expr = {
              taskResult <- value(asyncFutureObject)
            },
            warning = function(w) {
              caughtWarning <- w
              print(
                paste0(
                  Sys.time(),
                  ": ***WARNING*** processRunningTasks: '",
                  asyncTaskName,
                  "' returned a warning: ",
                  w
                )
              )
              print(sys.calls())
            },
            error = function(e) {
              caughtError <- e
              print(
                paste0(
                  Sys.time(),
                  ": ***ERROR*** processRunningTasks: '",
                  asyncTaskName,
                  "' returned an error: ",
                  e
                )
              )
              print(sys.calls())
            }
          )#end withCallingHandlers
        } #end if catch errors
        else {
          #simply fetch the value -- if exceptions happened they will be thrown by the Future library when we call value and
          #therefore will propagate to the caller
          taskResult <- value(asyncFutureObject)
        }
        rm(asyncFutureObject)
        submitTime <- asyncTaskObject[["submitTime"]]
        endTime <- Sys.time()
        elapsedTime <- format(endTime - submitTime)
        if (debug)
          print(
            paste0(
              Sys.time(),
              ": processRunningTasks finished: '",
              asyncTaskName,
              "' and there are ",
              getNumberOfRunningTasks(),
              " additional tasks still running.",
              # " submitTime: ",
              # submitTime,
              # ", endTime: ",
              # endTime,
              " Elapsed time since submitted: ",
              elapsedTime
            )
          )
        callback <- asyncTaskObject[["callback"]]
        tracklink <- asyncTaskObject[["tracklink"]]
        message(tracklink)
        asyncTasksRunning[[asyncTaskName]] <<- NULL
        if (!is.null(callback)) {
          callback(
            list(
              asyncTaskName = asyncTaskName,
              taskResult = taskResult,
              submitTime = submitTime,
              endTime = endTime,
              elapsedTime = elapsedTime,
              caughtError = caughtError,
              caughtWarning = caughtWarning,
              tracklink = tracklink
            )
          )
        }
      } #end if resolved
    }#end loop over async data items being loaded
    #Any more asynchronous data items being loaded?
    if (debug && (numTasksResolved > 0)) {
      print(
        paste0(
          Sys.time(),
          ": processRunningTasks with wait=",
          wait,
          " exiting after resolving: ",
          numTasksResolved,
          " tasks. Elapsed time in function: ",
          format(Sys.time() - functionStartTime),
          " tasks still running: ",
          length(asyncTasksRunning)
        )
      )
    }
    return(length(asyncTasksRunning))
  } # end processRunningTasks

fakeDataProcessing <- function(name, duration, sys_sleep = FALSE) {
  if (sys_sleep) {
    Sys.sleep(duration)
  } else {
    start_time <- Sys.time()
    repeat {
      elapsed_time = Sys.time() - start_time
      print(paste0(
        Sys.time(),
        ": ",
        name,
        " elapsed time: ",
        format(elapsed_time)
      ))
      if (elapsed_time < duration) {
        Sys.sleep(1)
      } else {
        break
      }
    } #end repeat
  } #end else not using long sleep
  return(data.frame(name = name, test = Sys.time()))
} #end fakeDataProcessing


testAsync <- function(loops = future::availableCores() - 1) {
  plan(multiprocess)
  print(paste0("future::availableCores(): ", future::availableCores()))
  loops <- 10 #
  baseWait <- 3
  for (loopNumber in 1:loops) {
    duration <- baseWait + loopNumber
    dataName <-
      paste0("FAKE_PROCESSED_DATA_testLoop-",
             loopNumber,
             "_duration-",
             duration)
    startAsyncTask(
      dataName,
      futureObj = future(lazy = FALSE, expr = fakeDataProcessing(dataName, duration)),
      debug = TRUE
    )
    
    #NOTE: if the future is created with lazy=TRUE then the process will not be kicked off until value() is called on it. resolved(futureObj) does not kick it off
    processRunningTasks(wait = FALSE, debug = TRUE)
  } #end loop
  
  #wait until all tasks are finished
  processRunningTasks(wait = TRUE, debug = TRUE)
  
  print(paste0(
    "At the end the status should have no running tasks: ",
    getRunningTasksStatus()
  ))
} #end testAsync
#testAsync()
guoguodigit/Metwork documentation built on Nov. 30, 2024, 10:04 p.m.