RAVEWatchDog <- R6::R6Class(
classname = "RAVEWatchDog",
portable = TRUE,
private = list(
pipeline_names = c(
.raw_path = character(0),
.job_name = character(0),
.watch_path = character(0),
.time_threshold = NULL,
.project_name = character(0),
.file_pattern = "^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$",
.registry_cache = NULL,
.set_status = function(item) {
stopifnot( && nrow(item) == 1 &&
setequal(names(item), c("Filename", "Subject", "Block",
"Status", "Details", "LastModified",
"TaskStarted", "TaskEnded", "Directory"))
registry <- self$load_registry(update = TRUE)
sel <- registry$Filename == item$Filename
if(length(sel) && any(sel)) {
registry <- registry[!sel, ]
# Directly binding two items will result in error (with NA)
item$LastModified <- as.POSIXlt(item$LastModified)
item$TaskStarted <- as.POSIXlt(item$TaskStarted)
item$TaskEnded <- as.POSIXlt(item$TaskEnded)
registry <- rbind(item, registry)
registry <- registry[order(registry$TaskStarted, na.last = TRUE, decreasing = TRUE), ]
private$.registry_cache <- registry
# save to csv
registry$LastModified <- strftime(registry$LastModified)
registry$TaskStarted <- strftime(registry$TaskStarted)
registry$TaskEnded <- strftime(registry$TaskEnded)
safe_write_csv(registry, file = self$registry_path, row.names = FALSE, quiet = TRUE)
.get_status = function(file, update = FALSE) {
stopifnot(length(file) == 1 && file.exists(file.path(private$.watch_path, file)))
mtime <- file.mtime(file.path(private$.watch_path, file))
# get subject code, block ID
fname <- filenames(file)
m <- gregexec(self$file_pattern, fname, = TRUE, useBytes = TRUE)[[1]]
ml <- attr(m, "match.length")
error_item <- data.frame(
Filename = file,
Subject = NA,
Block = NA,
Status = "parse-error",
Details = "Cannot parse subject code or block ID",
LastModified = mtime,
TaskStarted = NA,
TaskEnded = NA,
Directory = private$.watch_path
if(length(m) != 3) {
subject_code <- substr(fname, m[[2]], m[[2]] + ml[[2]] - 1)
block <- substr(fname, m[[3]], m[[3]] + ml[[3]] - 1)
if(!nchar(subject_code) || !nchar(block)) {
if(startsWith(block, "0")) {
block <- sprintf("block%s", block)
registry <- self$load_registry(update = update)
if(file %in% registry$Filename) {
sel <- which(registry$Filename == file)
item <- registry[sel[[1]], ]
item$LastModified <- mtime
} else {
item <- data.frame(
Filename = file,
Subject = subject_code,
Block = block,
Status = "ready",
Details = "Just found the subject",
LastModified = mtime,
TaskStarted = NA,
TaskEnded = NA,
Directory = private$.watch_path
.queue = NULL,
.jobs = NULL,
.max_jobs = numeric(0L)
public = list(
cache_path = character(0),
NSType_LFP = "ns3",
NSType_stimuli = "ns5",
initialize = function(watch_path, job_name = "RAVEWatchDog"){
if(!grepl("^[a-zA-Z0-9-]+$", job_name)) {
stop("Watch dog name can only contain letters, digits, and dash [-].")
private$.job_name <- job_name
private$.watch_path <- normalizePath(watch_path, mustWork = FALSE)
# Default values
private$.max_jobs <- 1L
private$.jobs <- dipsaus::fastmap2()
# We do not allow users to change raw_data_dir once created
private$.raw_path <- raveio_getopt("raw_data_dir")
# Automatically set cache path, it can be changed
self$cache_path <- file.path(cache_root(), "_automation_", job_name)
load_registry = function(update = TRUE) {
if(!$.registry_cache)) {
update <- TRUE
if(update) {
if(file.exists(self$registry_path)) {
tbl <- utils::read.csv(self$registry_path, header = TRUE,
colClasses = "character")
colnms <- c("Filename", "Subject", "Block",
"Status", "Details", "LastModified",
"TaskStarted", "TaskEnded", "Directory")
if(all(colnms %in% names(tbl))) {
tbl <- tbl[, colnms]
tbl$LastModified <- as.POSIXlt(tbl$LastModified)
tbl$TaskStarted <- as.POSIXlt(tbl$TaskStarted)
tbl$TaskEnded <- as.POSIXlt(tbl$TaskEnded)
# re-order
tbl <- tbl[order(tbl$TaskStarted, decreasing = TRUE),]
private$.registry_cache <- tbl
private$.registry_cache <- data.frame(
Filename = character(0L),
Subject = character(0L),
Block = character(0L),
Status = character(0L),
Details = character(0L),
LastModified = as.POSIXlt(character(0L)),
TaskStarted = as.POSIXlt(character(0L)),
TaskEnded = as.POSIXlt(character(0L)),
Directory = character(0L)
check_file_registry = function() {
# check the watch path
fs <- list.files(
all.files = FALSE,
full.names = FALSE,
include.dirs = FALSE,
recursive = TRUE
fs <- fs[grepl(self$file_pattern, filenames(fs), = TRUE)]
if(!length(fs)) { return(character(0L)) }
mtime <- file.mtime(file.path(private$.watch_path, fs))
sel <- mtime >= self$time_threshold
if(!any(sel)) { return(character(0L)) }
fs <- fs[sel]
registry <- self$load_registry()
ignored_files <- registry$Filename[!registry$Status %in% c("queued")]
fs <- fs[!fs %in% ignored_files]
add_to_queue = function(files, force = FALSE) {
files <- unique(files)
files <- files[!]
if(!length(files)) {
if(length(files) > 1) {
lapply(files, function(file) {
# extract information
file <- files
item <- private$.get_status(file)
if(force && item$Status == "running") {
stop("A process is working on the block. Only one process can work on a block at a time.")
if(item$Status == "parse-error") {
if(force || item$Status == "ready") {
item$Details <- ""
item$Status <- "queued"
if(force) {
private$.queue <- unique(c(file, private$.queue))
catgl("File [{file}] prepended to the task queue", level = "INFO")
} else {
private$.queue <- unique(c(private$.queue, file))
catgl("File [{file}] appended to the task queue", level = "INFO")
} else if (item$Status == "queued") {
if(!file %in% private$.queue) {
private$.queue <- unique(c(private$.queue, file))
}, error = function(e) {
catgl("Cannot add file [{file}] to queue. Reason: ",
e$message, level = "ERROR")
check_job_status = function() {
jobs <- private$.jobs
nms <- names(jobs)
lapply(nms, function(file) {
check <- jobs[[file]]
if(!is.function(check)) {
private$.queue <- private$.queue[!private$.queue %in% file]
catgl("Cannot find process handler of file [{file}]. Removed from queue", level = "WARNING")
code <- check()
if(code == 0) {
item <- private$.get_status(file)
item$Status <- "finished"
item$Details <- ""
item$TaskEnded <- as.POSIXlt(Sys.time())
# remove from queue
private$.queue <- private$.queue[!private$.queue %in% file]
catgl("File [{file}] finished. Removed from queue", level = "INFO")
if(code < 0) {
item <- private$.get_status(file)
item$Status <- "errored"
item$Details <- paste(attr(code, "rs_exec_error"), collapse = "")
item$TaskEnded <- as.POSIXlt(Sys.time())
# remove from queue
private$.queue <- private$.queue[!private$.queue %in% file]
catgl("File [{file}] errored (reason: {item$Details}). Removed from queue", level = "ERROR")
get_pipeline_default_settings = function() {
re <- lapply(private$pipeline_names, function(pname) {
pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
settings <- dipsaus::list_to_fastmap2(pipeline$get_settings())
if(pname == "notch_filter") {
settings$diagnostic_plot_params$path <- NULL
as.list(settings, sorted = TRUE)
names(re) <- private$pipeline_names
create_settings_file = function(overwrite = FALSE) {
settings_path <- file.path(self$log_path, "settings.yaml")
if(!overwrite && file.exists(settings_path)) {
stop("Existing settings file already created. If you want to overwrite that file, use `overwrite=TRUE`")
backup_file(settings_path, remove = FALSE)
save_yaml(self$get_pipeline_default_settings(), file = settings_path)
catgl("A settings file has been created at [{settings_path}]", level = "INFO")
get_pipeline_settings = function(pname, file, brfile) {
item <- private$.get_status(file)
# load blackrock file
electrode_table <- brfile$electrode_table
electrodes <- electrode_table$Electrode[electrode_table$NSType == self$NSType_LFP]
# load pipeline
pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
settings <- dipsaus::list_to_fastmap2(pipeline$get_settings())
# override user-defined settings
settings_path <- file.path(self$log_path, "settings.yaml")
if(file.exists(settings_path)) {
tmp <- load_yaml(settings_path)
dipsaus::list_to_fastmap2(as.list(tmp[[pname]]), settings)
# pipeline-specific settings
subject_code <- sprintf("%s__%s", item$Subject, item$Block)
switch (
"import_lfp_native" = {
settings$import_setup__project_name <- self$project_name
settings$import_setup__subject_code <- subject_code
settings$force_import <- TRUE
settings$skip_validation <- FALSE
srate <- brfile$sample_rates[[self$NSType_LFP]]
settings$import_channels__sample_rate <- srate
settings$import_channels__electrodes <- dipsaus::deparse_svec(electrodes)
settings$import_channels__electrode_file <- "auto"
settings$import_blocks__format <- names(IMPORT_FORMATS)[unlist(IMPORT_FORMATS) == 'native_blackrock']
settings$import_blocks__session_block <- item$Block
"notch_filter" = {
graph_path <- file.path(private$.raw_path, item$Subject, item$Block, "notch-diagnostic-plots")
settings$project_name <- self$project_name
settings$subject_code <- subject_code
graph_path <- dir_create2(graph_path)
settings$diagnostic_plot_params$path <- file.path(graph_path, "notch-diagnostic-plots.pdf")
"wavelet_module" = {
settings$project_name <- self$project_name
settings$subject_code <- subject_code
"reference_module" = {
settings$project_name <- self$project_name
settings$subject_code <- subject_code
settings$reference_name <- "[new reference]"
process_file = function(file) {
item <- private$.get_status(file)
brfile <- BlackrockFile$new(path = file.path(private$.watch_path, file), block = item$Block)
# prepare working directory
workdir <- file.path(self$cache_path, paste0(file, ".workdir"))
if(dir.exists(workdir)) {
unlink(workdir, force = TRUE, recursive = TRUE)
workdir <- dir_create2(workdir)
# copy pipelines
for(pname in private$pipeline_names) {
pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
dest <- file.path(workdir, "pipelines", pipeline$pipeline_name)
src = pipeline$pipeline_path,
dest = dest,
activate = FALSE
pipeline <- pipeline(pname, paths = file.path(workdir, "pipelines"))
settings <- self$get_pipeline_settings(pname = pname, file = file, brfile = brfile)
pipeline$set_settings(.list = settings)
catgl("Set pipeline [{pname}]:", level = "DEFAULT")
save_yaml(settings, file = stdout())
# copy files
block_path <- file.path(private$.raw_path, item$Subject, item$Block)
fs <- paste0(brfile$base_path, c(".nev", ".ccf", paste0(".ns", 1:9)))
fs <- fs[file.exists(fs)]
for(f in fs) {
file.copy(f, file.path(block_path, basename(f)), overwrite = TRUE, recursive = FALSE, = TRUE)
fake_path <- file.path(private$.raw_path, sprintf("%s__%s", item$Subject, item$Block))
fake_path <- dir_create2(fake_path)
if(!file.exists(file.path(fake_path, item$Block))) {
if(dipsaus::get_os() == "windows") {
file.copy(block_path, to = fake_path, recursive = TRUE, = TRUE)
} else {
file.symlink(block_path, to = fake_path)
# Make sure the subject surface files can be loaded properly?
if(!file.exists(file.path(fake_path, 'rave-imaging'))) {
# check if original subject has the fs recon
imaging_path_orig <- file.path(private$.raw_path, item$Subject, 'rave-imaging')
if(file.exists(imaging_path_orig)) {
if(dipsaus::get_os() == "windows" || !dir.exists(imaging_path_orig)) {
# On windows, symlink does not work well so just copy
# On Unix, if rave-imaging is a symlink, then R (4.0) will treat
# the path as a file. Just copy over
file.copy(imaging_path_orig, to = fake_path,
recursive = TRUE, = TRUE)
} else {
file.symlink(imaging_path_orig, to = fake_path)
# set to running
catgl("Start processing [{file}]", level = "INFO")
item$Status <- "running"
item$Details <- ""
item$TaskStarted <- as.POSIXlt(Sys.time())
item$TaskEnded <- as.POSIXlt(NA)
private$.jobs[[file]] <- dipsaus::rs_exec(
name = file,
focus_on_console = TRUE,
rs = TRUE,
wait = FALSE,
quoted = TRUE,
nested_ok = TRUE,
expr = bquote({
workdir <- .(workdir)
cwd <- getwd()
on.exit({ setwd(cwd) }, add = TRUE, after = FALSE)
raveio <- asNamespace('raveio')
ravedash <-'asNamespace', list('ravedash'))
ravedash$set_logger_path(root_path = .(self$log_path), max_files = 10L)
ravedash$logger_threshold("trace", type = 'file')
ravedash$logger_threshold("trace", type = 'console')
} else {
ravedash <- NULL
blackrock_src <- .(file)
pname <- "import_lfp_native"
pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
pipeline$run(async = FALSE, as_promise = FALSE,
scheduler = "none", type = "smart")
raveio$catgl("[{blackrock_src}]: [{pname}] finished", level = "INFO")
pname <- "notch_filter"
pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
pipeline$run(names = "apply_notch", async = FALSE, as_promise = FALSE,
scheduler = "none", type = "smart")
pname <- "wavelet_module"
pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
pipeline$run(async = FALSE, as_promise = FALSE,
scheduler = "none", type = "smart")
subject <- pipeline$read("subject")
# generate reference if exists
pname <- "reference_module"
pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] (reference_table_initial) at [{pipeline$pipeline_path}]", level = "INFO")
# check subject's localization
elec_path <- .(file.path(private$.raw_path, item$Subject, "rave-imaging", "electrodes.csv"))
if(!file.exists(elec_path)) {
elec_path <- .(file.path(raveio_getopt("data_dir"), private$.project_name, item$Subject, "rave", "meta", "electrodes.csv"))
if(!file.exists(elec_path)) {
# list all projects, try to find
all_projects <- raveio$get_projects(refresh = TRUE)
elec_path <- file.path(raveio_getopt("data_dir"), all_projects,
.(item$Subject), "rave", "meta", "electrodes.csv")
elec_path <- elec_path[! & file.exists(elec_path)]
if(length(elec_path)) {
elec_path <- elec_path[[1]]
if(length(elec_path) == 1 && ! && file.exists(elec_path)) {
elec_path <- elec_path[[1]]
elec_table <- utils::read.csv(elec_path)
elec_table$Electrode <- as.integer(elec_table$Electrode)
if(length(elec_table$Electrode) == length(subject$electrodes)) {
o <- order(elec_table$Electrode)
elec_table <- elec_table[o, ]
elec_table$Electrode <- sort(subject$electrodes)
raveio$safe_write_csv(elec_table, file.path(subject$meta_path, "electrodes.csv"),
row.names = FALSE)
}, error = function(e) {
if(is.environment(ravedash)) {
ravedash$logger_error_condition(e, level = "warning")
} else {
pipeline$run(names = "reference_table_initial",
async = FALSE, as_promise = FALSE,
scheduler = "none", type = "smart")
unsaved_meta <- file.path(subject$meta_path, "reference__unsaved.csv")
target_meta <- file.path(subject$meta_path, "reference_auto_generated.csv")
if(file.exists(unsaved_meta) && !file.exists(target_meta)) {
file.copy(unsaved_meta, target_meta, overwrite = TRUE)
# make subject backward-compatible
raveio$catgl("[{blackrock_src}]: Making data format backward-compatible", level = "INFO")
raveio$catgl("[{blackrock_src}]: Done", evel = "INFO")
scan = function() {
files <- self$check_file_registry()
# check job status
njobs <- self$check_job_status()
inactives <- private$.queue[!private$.queue %in% names(private$.jobs)]
if(length(inactives) && njobs < private$.max_jobs) {
# schedule job
navails <- private$.max_jobs - njobs
inactives <- inactives[seq_len(min(navails, length(inactives)))]
for(file in inactives) {
reset_registry = function() {
if(!interactive()) {
stop("Cannot reset registry in non-interactive mode")
ans <- dipsaus::ask_yesno(sprintf("Clearing registry for [%s]?", private$.job_name))
if(isTRUE(ans)) {
unlink(self$registry_path, force = TRUE)
watch = function(interval = 5) {
interval <- as.numeric(interval)
if(!isTRUE(interval >= 1)) {
stop("Min interval must be >= 1 seconds")
ravedash <-'asNamespace', list('ravedash'))
ravedash$set_logger_path(root_path = self$log_path, max_files = 10L)
ravedash$logger_threshold("trace", type = 'file')
ravedash$logger_threshold("trace", type = 'console')
} else {
ravedash <- NULL
if(is.environment(ravedash)) {
}, add = TRUE, after = TRUE)
# make sure directories are there
while(TRUE) {
}, error = function(e) {
catgl("Error raised while the master process rescans/schedules tasks. Reason: {paste(e$message, collapse = '\n')}\nWill try again later",
level = "ERROR")
active = list(
log_path = function() {
file.path(private$.raw_path, "_automation", private$.job_name)
registry_path = function() {
file.path(self$log_path, "registry.csv")
time_threshold = function(v) {
if(!missing(v)) {
if(length(v) != 1 || {
stop("Cannot set time threshold with invalid time")
tm <- v
if(is.character(tm)) {
v <- as.POSIXlt(tm)
if( {
stop("`time_threshold` must have format of [year-month-day hour:minute:second]. For example, '2022-08-03 16:38:00'")
} else if (!inherits(tm, "POSIXlt")) {
stop("`time_threshold` must be characters or a `POSIXlt` time object")
private$.time_threshold <- v
project_name = function(v) {
if(!missing(v)) {
if(!length(v)) {
private$.project_name <- character(0)
} else if(length(v) > 1) {
stop("Project name must have length of 1")
} else if(!grepl("^[a-zA-Z0-9_]", v)) {
stop("Project name can only contain letters, digits, and underscore [_]")
} else {
private$.project_name <- as.character(v)
pn <- private$.project_name
if(!length(pn)) {
pn <- "automated"
file_pattern = function(v) {
if(!missing(v)) {
v <- v[[1]]
m <- gregexpr("(\\([^\\(\\)]+\\))", v, = TRUE)
m <- unique(m[[1]])
if(length(m) < 2) {
stop("File pattern must be a regular expression containing at least two keyword extractors so I can decide the subject code and session block ID. For example, regular expression ['^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$'] matches [YAB_datafile_001.nev]. RAVE will set subject code to [YAB], and block ID as [001].")
private$.file_pattern <- v
queued = function() {
max_jobs = function(v) {
if(!missing(v)) {
errored <- TRUE
if(length(v) == 1) {
v <- as.integer(v)
if(isTRUE(v > 0)) {
private$.max_jobs <- v
errored <- FALSE
if(errored) {
stop("Cannot set `max_jobs`, the value must be a positive integer")
#' Monitors 'BlackRock' output folder and automatically import data into 'RAVE'
#' @description Automatically import 'BlackRock' files from designated folder
#' and perform 'Notch' filters, 'Wavelet' transform; also generate epoch,
#' reference files.
#' @param watch_path the folder to watch
#' @param project_name the project name to generate
#' @param task_name the watcher's name
#' @param scan_interval scan the directory every \code{scan_interval} seconds,
#' cannot be lower than 1
#' @param time_threshold time-threshold of files: all files with modified
#' time prior to this threshold will be ignored; default is current time
#' @param max_jobs maximum concurrent imports, default is 1
#' @param as_job whether to run in 'RStudio' background job or to block the
#' session when monitoring; default is auto-detected
#' @param dry_run whether to dry-run the code (instead of executing the
#' scripts, return the watcher's instance and open the settings file);
#' default is false
#' @param config_open whether to open the pipeline configuration file; default
#' is equal to \code{dry_run}
#' @returns When \code{dry_run} is true, then the watcher's instance will be
#' returned; otherwise nothing will be returned.
#' @export
auto_process_blackrock <- function(
watch_path, project_name = "automated", task_name = "RAVEWatchDog",
scan_interval = 10, time_threshold = Sys.time(), max_jobs = 1L,
as_job = NA, dry_run = FALSE, config_open = dry_run
) {
time_threshold <- as.POSIXlt(time_threshold)
time_threshold <- time_threshold[!]
if(!length(time_threshold)) {
time_threshold <- as.POSIXlt(Sys.time())
} else {
time_threshold <- time_threshold[[1]]
if(!isFALSE(as_job)) {
as_job <- dipsaus::rs_avail()
fun <- dipsaus::new_function2(body = bquote({
raveio <- asNamespace('raveio')
watcher <- raveio$RAVEWatchDog$new(
watch_path = .(watch_path),
job_name = .(task_name)
watcher$time_threshold <- .(time_threshold)
watcher$max_jobs <- .(max_jobs)
watcher$project_name <- .(project_name)
settings_path <- file.path(watcher$log_path, "settings.yaml")
if(!file.exists(settings_path)) {
}), quote_type = "quote")
watcher <- fun()
if( config_open ) {
settings_path <- file.path(watcher$log_path, "settings.yaml")
if(!file.exists(settings_path)) {
catgl("Watcher's settings file has been opened. Please check the settings, and edit if necessary. All auto-discovered BlackRock files will be preprocessed using this settings file.", level = "INFO")
}, silent = TRUE)
if( dry_run ) {
if(as_job) {
fun <- dipsaus::new_function2(body = .(body(fun)))
# return(fun)
watcher <- fun()
watcher$watch(interval = .(scan_interval))
quoted = TRUE,
rs = TRUE,
name = task_name,
focus_on_console = TRUE
} else {
watcher$watch(interval = scan_interval)
