#'
#' @docType class
#'
#' @importFrom R6 R6Class
#' @importFrom stringr str_split
#' @importFrom purrr is_null is_list is_character is_logical
#' @importFrom jsonlite fromJSON
#' @importFrom rio export import
#' @importFrom tibble as_tibble
#' @importFrom dplyr select filter
#' @importFrom tidyr replace_na
#' @importFrom magrittr %>%
#' @importFrom data.table rbindlist
#' @importFrom elastic mapping_create index_forcemerge index_optimize index_flush index_clear_cache index_open index_close connect index_recreate index_create index_delete cluster_stats index_get docs_bulk_update docs_bulk_index reindex Search docs_mget scroll scroll_clear
#'
#' @export
#'
#' @keywords data integration dataset
#'
#' @name kibior
#'
#' @title KibioR, an Kibio and Elasticsearch data manipulation package.
#'
#' @description KibioR is a lightweight package for data manipulation
#' with Elasticsearch. Its main features allow easy data import, export,
#' download, upload, searching and sharing to any Elasticsearch-based open
#' architecture, scaling to billions of data and TB capability.
#'
#' @details A client to send, retrieve, search, join data in Elasticsearch.
#'
#' @format \code{\link{R6Class}} object.
#'
#' @author RĂ©gis Ongaro-Carcy,
#' \email{regis.ongaro-carcy2@crchudequebec.ulaval.ca}
#'
#' @references Kibio.science: \url{http://kibio.science}, \cr
#' Elasticsearch documentation:
#' \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html}
#'
#' @seealso \code{\link{kibior}}
#'
#' @section Constructor Arguments:
#' \tabular{llll}{
#' \strong{Argument} \tab \strong{Type} \tab \strong{Details} \tab \strong{Default} \cr
#' \code{host} \tab character \tab address or name of Elasticsearch server \tab "localhost" \cr
#' \code{port} \tab numeric \tab port of Elasticsearch server \tab 9200 \cr
#' \code{user} \tab character \tab if required by the server, the username for authentication \tab NULL \cr
#' \code{pwd} \tab character \tab if required by the server, the password for authentication \tab NULL \cr
#' \code{verbose} \tab logical \tab verbose mode \tab FALSE \cr
#' }
#'
Kibior <- R6Class(
c("Kibior", "KibiorOperators"),
lock_objects = FALSE,
lock_class = FALSE,
# ============================================================================
# PRIVATE
# ============================================================================
private = list(
# --------------------------------------------------------------------------
# attributes
# --------------------------------------------------------------------------
# Private .host Elasticsearch connection host
.host = NULL,
# Private .port Elasticsearch connection port
.port = NULL,
# Private .user Elasticsearch auth user
.user = NULL,
# Private .pwd Elasticsearch auth password
.pwd = NULL,
# Private .connection Elstic connection, computed automatically when host or port are
# changed
.connection = NULL,
# Private .es_version Elasticsearch version, computed automatically when the connection is
# set
.es_version = NULL,
# Private .head_search_size Size of search when head mode is activated, `$search(head =
# TRUE)`
.head_search_size = 5,
# Private .es_wait Time in second to wait when an update command is sent to ES
.es_wait = 2,
# Private .es_primaries Default allocated number of primary shards when a new ES index is
#' created
.es_primary_shards = 1,
# Private .es_replica_shards Default allocated number of replica shards when a new ES index
# is created
.es_replica_shards = 1,
# Private .default_id_col Default name given to unique field when pushing data.
.default_id_col = "kid",
# --------------------------------------------------------------------------
# constants
# --------------------------------------------------------------------------
# Private .MAX_BUCKET_SIZE Threshold for returning data from ES
.MAX_BUCKET_SIZE = 10000L,
# Private .VALID_JOINS List of valid joins, used to test
.VALID_JOINS = c("inner", "full", "left", "right", "semi", "anti"),
# Private .VALID_COUNT_TYPES List of count valid types, used to test
.VALID_COUNT_TYPES = c("observations", "variables"),
# Private .VALID_FEATURES List of Elasticsearch valid features in metadata, used to test
.VALID_FEATURES = c("settings", "mappings", "aliases"),
# Private .VALID_PUSH_MODES List of valid push data modes, used to test
.VALID_PUSH_MODES = c("check", "recreate", "update"),
# Private .RESERVED_QUERY_STRING_CHAR reserved characters for query string syntax
.RESERVED_QUERY_STRING_CHAR = stringr::str_split("+ - = && || > < ! ( ) { } [ ] ^ \" ~ * ? : \\ /", " ")[[1]],
# Private .AUTO_IMPORT_EXTENSION_MAPPING List of file extension to try automatic import
.AUTO_IMPORT_EXTENSION_MAPPING = list(
# tabular
"csv" = "tabular",
"tsv" = "tabular",
"txt" = "tabular",
"tab" = "tabular",
# features
"wig" = "features",
"bigwig"= "features",
"gtf" = "features",
"gff" = "features",
"gff3" = "features",
"bed" = "features",
# "bb" = stop("Not supported. See https://support.bioconductor.org/p/116736/"),
# alignment
"bam" = "alignments",
"bai" = "alignments",
# sequence
"fasta" = "sequences",
"fa" = "sequences",
"fna" = "sequences",
"ffn" = "sequences",
"faa" = "sequences",
"frn" = "sequences",
# reads
"fastq" = "reads",
"fq" = "reads",
# json
"json" = "json"
),
# Private kibior indices metadata
.KIBIOR_METADATA_INDICES = ".kibior_indices",
# --------------------------------------------------------------------------
# methods - inherits
# --------------------------------------------------------------------------
finalize = function(){
"Destructor: finalize this object, wipe state"
private$.connection <- NULL
private$.es_version <- NULL
private$.host <- NULL
private$.port <- NULL
private$.user <- NULL
private$.pwd <- NULL
},
# --------------------------------------------------------------------------
# methods - abstract & utils
# --------------------------------------------------------------------------
single_value_metric_aggregation = function(aggregation_type, index_name, columns, query = NULL){
"[Abstract method] Execute a single value metric aggregation."
"Mainly for avg, min, max, sum."
""
"@param aggregation_type the aggregation to execute."
"@param index_name the indices to target."
"@param columns the columns to target."
"@param query the query to target some data. (default: NULL)"
"@return the list of tibble with the results."
if(!purrr::is_character(aggregation_type)) stop(private$err_param_type_character("aggregation_type"))
if(length(aggregation_type) > 1) stop(private$err_one_value("aggregation_type"))
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(!purrr::is_character(columns)) stop(private$err_param_type_character("columns"))
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
#
involved_indices <- self$match(index_name)
#
res <- list()
if(!purrr::is_null(involved_indices)){
# searched cols
if(self$verbose){
columns %>%
private$vector_to_str() %>%
message(" -> Searching column names: ", .)
message(" -> Getting ", aggregation_type)
}
#
get_value <- function(v){
if(self$version$major > 6) v$value else v
}
#
for(ind in involved_indices){
res[[ind]] <- list()
# get all columns from an index
real_columns <- suppressMessages({ self$columns(ind)[[ind]] })
#
for(col in columns){
# req
res[[ind]][[col]] <- tryCatch(
expr = {
# raise an error if the col is not known
# should not happen but sum() actually let the
# unknown column with 0 as default value
if(!(col %in% real_columns)) stop("ABSENT_COLUMN")
# define agg
agg_body <- paste0('{"size":0,"aggs":{"kaggs":{"',
aggregation_type, '":{"field":"', col, '"}}}}')
#req
stmp <- elastic::Search(
self$connection,
index = ind,
size = 0,
q = query,
body = agg_body
) %>%
.$aggregations %>%
.$kaggs %>%
get_value()
if(purrr::is_null(stmp)) stop("ABSENT_COLUMN")
# add index and col names
tmp <- list()
tmp[[aggregation_type]] <- stmp
tmp <- as.data.frame(tmp, stringsAsFactors = FALSE)
tmp <- cbind(column = col, tmp)
tmp
},
error = function(e){
if("ABSENT_COLUMN" == e$message){
if(self$verbose){
message(" - Skipping absent column '", col, "' of '", ind, "'.")
}
} else {
if(grepl("all shards failed", e$message, ignore.case = TRUE)){
if(self$verbose){
message(" - Skipping non numeric column '", col, "' of '", ind, "'.")
}
} else {
# reraise
stop(e$message)
}
}
}
)
}
#
if(!private$is_list_empty(res[[ind]])){
res[[ind]] <- res[[ind]] %>%
data.table::rbindlist() %>%
tibble::as_tibble()
# change dot to underscore in names
names(res[[ind]]) <- gsub("\\.", "_", names(res[[ind]]))
}
}
}
if(length(res) == 0){
res <- NULL
}
if(self$quiet_results) invisible(res) else res
},
multi_value_metric_aggregation = function(aggregation_type, index_name, columns, function_test_null_result = NULL, aggregation_supplementary_args = NULL, query = NULL){
"[Abstract method] Execute a multi value metric aggregation."
"Mainly for percentile, stats, summary, q1, q2, q3, median."
""
"@param aggregation_type the aggregation to execute."
"@param index_name the indices to target."
"@param columns the columns to target."
"@param function_test_null_result the function to test the mull ES result (default: NULL)."
"@param aggregation_supplementary_args a string to pass to the request to add some args (default: NULL)."
"@param query the query to target some data (default: NULL)."
"@return the list of tibble with the results."
if(!purrr::is_character(aggregation_type)) stop(private$err_param_type_character("aggregation_type"))
if(length(aggregation_type) > 1) stop(private$err_one_value("aggregation_type"))
if(!purrr::is_null(aggregation_supplementary_args)){
if(!purrr::is_character(aggregation_supplementary_args)) stop(private$err_param_type_character("aggregation_supplementary_args"))
if(length(aggregation_supplementary_args) > 1) stop(private$err_one_value("aggregation_supplementary_args"))
}
if(purrr::is_null(function_test_null_result)) stop("need a function to test the metrics.")
if(!("closure" %in% typeof(function_test_null_result))) stop("not a function, need a funciton to test the metrics.")
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(!purrr::is_character(columns)) stop(private$err_param_type_character("columns"))
if(!purrr::is_null(aggregation_supplementary_args) && !purrr::is_character(aggregation_supplementary_args)) stop("aggregation args must be a string.")
if(length(aggregation_supplementary_args) > 1) stop("aggregation body error.")
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
#
involved_indices <- self$match(index_name)
#
res <- list()
if(!purrr::is_null(involved_indices)){
# supplementary args for req body
s <- if(purrr::is_null(aggregation_supplementary_args)) "" else aggregation_supplementary_args
# searched cols
if(self$verbose){
columns %>%
private$vector_to_str() %>%
message(" -> Searching column names: ", .)
message(" -> Getting ", aggregation_type)
}
#
for(ind in involved_indices){
res[[ind]] <- list()
# get all columns from an index
real_columns <- suppressMessages({ self$columns(ind)[[ind]] })
for(col in columns){
# define agg
agg_body <- paste0('{"size":0,"aggs":{"kaggs":{"',
aggregation_type, '":{"field": "', col, '"', s, '}}}}')
# req
res[[ind]][[col]] <- tryCatch(
expr = {
# raise an error if the col is not known
# should not happen but sum() actually let the
# unknown column with 0 as default value
if(!(col %in% real_columns)) stop("ABSENT_COLUMN")
# req
tmp <- elastic::Search(
self$connection,
index = ind,
size = 0,
q = query,
body = agg_body
) %>%
.$aggregations %>%
.$kaggs
if(function_test_null_result(tmp)) stop("ABSENT_COLUMN")
# add index and col names
tmp <- as.data.frame(tmp, stringsAsFactors = FALSE)
tmp <- cbind(column = col, tmp)
tmp
},
error = function(e){
if( e$message == "ABSENT_COLUMN"){
if(self$verbose){
message(" - Skipping absent column '", col, "' of '", ind, "'.")
}
} else {
if(grepl("all shards failed", e$message, ignore.case = TRUE)){
if(self$verbose){
message(" - Skipping non numeric column '", col, "' of '", ind, "'.")
}
} else {
# reraise
stop(e$message)
}
}
}
)
}
#
if(!private$is_list_empty(res[[ind]])){
res[[ind]] <- res[[ind]] %>%
data.table::rbindlist() %>%
tibble::as_tibble()
# change dot to underscore in names
names(res[[ind]]) <- gsub("\\.", "_", names(res[[ind]]))
}
}
}
if(length(res) == 0){
res <- NULL
}
if(self$quiet_results) invisible(res) else res
},
join = function(join_type = NULL, left_index, right_index, left_columns = NULL, left_query = NULL, left_bulk_size = 500, left_max_size = NULL, right_columns = NULL, right_query = NULL, right_bulk_size = 500, right_max_size = NULL, by = NULL, keep_metadata = FALSE) {
"[Abstract method] Execute a join between two datasets using `dplyr` joins."
"The datasets can be in-memory (variable name) or the name of an currently stored Elasticsearch index."
"This should not be call directly. "
"Use one of the (`$inner_join`, `$full_join`, `$left_join`, `$right_join`, `$anti_join` or `$semi_join`) public methods instead."
""
"@param join_type the join type, defined by `private$.VALID_JOINS`. (default: NULL)"
"@param left_index the left index name or dataset."
"@param right_index the right index name or dataset."
"@param left_columns the left index columns to select. (default: NULL)"
"@param left_query the left index query. (default: NULL)"
"@param left_bulk_size the left index bulk size when downloading from Elasticsearch. (default: 500)"
"@param left_max_size the left index max size (hard threshold). (default: NULL)"
"@param right_columns the right index columns to select. (default: NULL)"
"@param right_query the right index query. (default: NULL)"
"@param right_bulk_size the right index bulk size when downloading from Elasticsearch. (default: 500)"
"@param right_max_size the right index max size (hard threshold). (default: NULL)"
"@param by the field names used to join the two datasets. Should be a named vector or list of this format c('field_left' = 'field_right') (default: NULL)"
"@param keep_metadata Keep Elasticsearch metadata? (default: FALSE)"
"@return the result of the called join"
# TODO: add a "from_instance" param to join between instances
# TODO: manage query on in-memory data through `enquo() + !!` ?
# missing ds/index names
if(missing(left_index)) stop("Missing left dataset/index name.")
if(missing(right_index)) stop("Missing right dataset/index name.")
# join type
if(!purrr::is_character(join_type)) stop(private$err_param_type_character("join_type"))
if(!(join_type %in% private$.VALID_JOINS)) stop(private$err_not_in_vector("Join type", private$.VALID_JOINS))
# join
if(!purrr::is_null(by) && !purrr::is_character(by)) stop(private$err_param_type_character("by", can_be_null = TRUE))
if(purrr::is_character(by) && private$is_search_pattern(by)) stop(private$err_search_pattern_forbidden("by"))
# metadata
if(!purrr::is_logical(keep_metadata)) stop(private$err_param_type_logical("keep_metadata"))
if(is.na(keep_metadata)) stop(private$err_logical_na("keep_metadata"))
# add columns from "by" to the left and right columns if not present
if(!purrr::is_null(left_columns)){
# get "by" left index column names
by_left_columns <- by %>% as.list() %>% names()
left_columns <- c(left_columns, by_left_columns) %>% unique()
}
if(!purrr::is_null(right_columns)){
# get "by" right index column names
by_right_columns <- by %>% as.list() %>% unlist(use.names = FALSE)
right_columns <- c(right_columns, by_right_columns) %>% unique()
}
# side args check (left and right)
check_side_args <- function(side){
#
name_arg <- function(stype){ paste0(side, "_", stype) }
call_arg <- function(stype){ name_arg(stype) %>% parse(text = .) %>% eval() }
# names
n_index <- name_arg("index")
n_query <- name_arg("query")
n_columns <- name_arg("columns")
n_bulk_size <- name_arg("bulk_size")
n_max_size <- name_arg("max_size")
# values
v_index <- call_arg("index")
v_query <- call_arg("query")
v_columns <- call_arg("columns")
v_bulk_size <- call_arg("bulk_size")
v_max_size <- call_arg("max_size")
# index
if(purrr::is_character(v_index)) {
# data from ES index
if(private$is_search_pattern(v_index)) stop(private$err_search_pattern_forbidden(n_index))
if(!purrr::is_null(v_query)){
if(!purrr::is_character(v_query)) stop(private$err_param_type_character(n_query, can_be_null = TRUE))
if(length(v_query) > 1) stop(private$err_one_value(n_query))
}
if(!purrr::is_null(v_columns) && !purrr::is_character(v_columns)) {
stop(private$err_param_type_character(n_columns, can_be_null = TRUE))
}
} else {
# data from in-memory
if(purrr::is_null(v_index)) stop(private$err_null_forbidden(n_index))
if(!("data.frame" %in% class(v_index))) stop("Joining cannot be executed on '", n_index, "', need an index name, a data.frame or derivative.")
if(!purrr::is_null(v_query) && self$verbose) message("`", n_query, "` will be ignored.")
}
# bulk size
if(!is.numeric(v_bulk_size)) stop(private$err_param_type_numeric(n_bulk_size))
if(length(v_bulk_size) > 1) stop(private$err_one_value(n_bulk_size))
if(v_bulk_size < 1) stop(private$err_param_positive(n_bulk_size, can_be_null = FALSE))
# max size
if(!purrr::is_null(v_max_size)) {
if(!is.numeric(v_max_size)) stop(private$err_param_type_numeric(n_max_size))
if(length(v_max_size) > 1) stop(private$err_one_value(n_max_size))
if(v_max_size < 1) stop(private$err_param_positive(n_max_size, can_be_null = FALSE))
}
}
# check left side
check_side_args("left")
# check right side
check_side_args("right")
#
get_data <- function(index, columns, query, bulk_size, max_size){
"Function: get data from ES index or memory"
index_data <- NULL
if(purrr::is_character(index)){
# if char, it is the index name needed to be pulled out
if(self$verbose) message("from index '", index, "'")
index_data <- self$pull(
index_name = index,
columns = columns,
query = query,
bulk_size = bulk_size,
max_size = max_size,
keep_metadata = keep_metadata
)[[index]]
# if keep metadata, remove "_source" prefix in colnames
if(keep_metadata){
index_data <- index_data %>%
dplyr::rename_at(
dplyr::vars(dplyr::starts_with('_source.')),
.funs = function(x){
x %>% lapply(function(y){
stringr::str_split(y, "_source.")[[1]][2]
})
}
)
}
} else {
# if data.frame derivative, just load it as tibble
if(self$verbose) message("from memory")
index_data <- index %>%
dplyr::select( if(purrr::is_null(columns)) dplyr::everything() else columns ) %>%
(function(x){ if(!purrr::is_null(max_size)) head(x, max_size) else x }) %>%
tibble::as_tibble(.name_repair = "unique")
}
# ret
index_data
}
# get left
if(self$verbose) message("Getting left ", appendLF = FALSE)
left <- get_data(
index = left_index,
columns = left_columns,
query = left_query,
bulk_size = left_bulk_size,
max_size = left_max_size
)
# get right
if(self$verbose) message("Getting right ", appendLF = FALSE)
right <- get_data(
index = right_index,
columns = right_columns,
query = right_query,
bulk_size = right_bulk_size,
max_size = right_max_size
)
# prepare args for join
fargs <- list(
left,
right,
by = by,
suffix = c("_left", "_right")
)
# call to dplyr joins
join_name <- paste0(join_type, "_join")
if(self$verbose) message(" -> Executing ", join_name, "...")
join_name %>%
list("dplyr") %>%
do.call(what = "getFromNamespace", args = .) %>%
do.call(args = fargs)
},
symmetric_difference = function(x, y){
"Compute a symmetric difference between two sets"
""
"@param x the first set."
"@param y the second set."
"@return the symmetric difference"
sd <- dplyr::setdiff(dplyr::union(x, y), dplyr::intersect(x, y))
if(length(sd) == 0) sd <- NULL
sd
},
round = function(nb, nb_decimal = 1L){
"Round to a number to a given decimal number"
""
"@param nb the number to round."
"@param nb_decimal the number of decimal to get."
"@return a rounded number"
if(!is.numeric(nb)) stop(private$err_param_type_numeric("nb"))
if(!is.numeric(nb_decimal)) stop(private$err_param_type_numeric("nb_decimal"))
if(nb_decimal < 0) stop(private$err_param_positive("nb_decimal"))
nb_decimal <- as.integer(nb_decimal)
#
round(nb, nb_decimal) %>%
format(nsmall = nb_decimal) %>%
trimws() %>%
as.double()
},
humanize_time = function(time){
"Format a second-based time to a easily readable string"
""
"@param time the time in seconds"
"@return a list composed of `$time` and `$unit`"
# take a time number (in seconds) and returns a more readable version with unit
if(!is.numeric(time)) stop(private$err_param_type_numeric("time"))
if(time < 0) stop(private$err_param_positive("time"))
# time in seconds
res <- list(unit = "s", time = time)
# milliseconds
if(res$time < 1){
# update to ms
res$time <- res$time * 1000
res$unit <- "ms"
} else {
# minutes
if(res$time > 60){
# update to min
res$time <- res$time / 60
res$unit <- "m"
if(res$time > 60){
# update to hour
res$time <- res$time / 60
res$unit <- "h"
if(res$time > 24){
# update to day
res$time <- res$time / 24
res$unit <- "d"
}
}
}
}
# round it
res$time <- as.character(private$round(res$time, nb_decimal = 3))
res
},
is_list_empty = function(my_list){
"Shortcut to test if a variable is a list and is empty"
""
"@examples"
"'aaa' %>% is_list_empty() # ERROR"
"c('aaa', 'bbb') %>% is_list_empty() # ERROR"
"list('aaa') %>% is_list_empty() # [1] FALSE"
"list() %>% is_list_empty() # [1] TRUE"
""
"@param my_list an object to test"
"return ERROR if my_list is not a list, TRUE if my_list is empty, else FALSE"
if(!purrr::is_list(my_list)) stop(private$err_param_type_list("my_list"))
(length(my_list) == 0)
},
vector_to_str = function(vect){
"Transform a vector to a listed string"
vect %>%
paste0(collapse = "', '") %>%
paste0("'", ., "'")
},
df_to_line = function(df){
"Transform a dataframe-like structure to a list of its lines"
split(df, seq(nrow(df)))
},
# --------------------------------------------------------------------------
# methods - Elastic utils
# --------------------------------------------------------------------------
connect = function(){
"Connect to Elasticsearch API with the given attributes by updating the `$connection`."
"Called automatically when changing `$host` or `$port`."
"Should not be called otherwise."
"Also update the Elasticsearch version available through `$version` when connected."
private$.connection <- elastic::connect(host = self$host,
port = self$port,
user = self$user,
pwd = self$pwd)
# test connection
p <- self$ping()
if(purrr::is_null(p)) stop("Connection to '", self$host, ":", self$port, "' cannot be established.")
# if connection is good
msg <- paste0("Connected to '", p$cluster_name, "' at '", self$host, ":", self$port, "'")
if(self$verbose) msg <- paste0(msg, " [Elasticsearch ", p$version$number, "]")
message(msg)
# Catch ES version
private$.es_version <- stringr::str_split(p$version$number, "\\.")[[1]] %>% as.integer() %>% as.list()
names(private$.es_version) <- c("major", "minor", "patch")
},
metadata_type = function(mtype, index_name){
if(!purrr::is_character(mtype)) stop(private$err_param_type_character("mtype"))
if(length(mtype) > 1) stop(private$err_one_value("mtype"))
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
#
res <- NULL
m <- suppressWarnings({
elastic::index_get(
self$connection,
index = index_name,
features = mtype,
include_type_name = FALSE,
verbose = self$verbose
)
})
if(length(m) > 0){
res <- m %>%
lapply(function(x){ x[[mtype]] }) %>%
(function(x){ if(private$is_list_empty(x)) NULL else x })
}
if(self$quiet_results) invisible(res) else res
},
define_mappings = function(data){
"Define the Elasticsearch mapping of a dataset (list subclass)."
""
"@examples"
"private$define_mappings(starwars) # private method, cannot be called from outside"
""
"@param data a dataset"
"@return the mapping of the dataset"
if(!purrr::is_list(data)) stop("Need `data.frame` or derivative data structure")
if(purrr::is_null(data) || length(data) == 0) stop(private$err_empty_data("data"))
# types
text_type <- list(
type = "text",
fields = list(
keyword = list(
type = "keyword",
ignore_above = 256
)
)
)
numeric_type <- list(type = "float")
# map
map_types <- function(x,y){
xclass <- class(x)
if(length(xclass) > 1 && "factor" %in% xclass){
xclass <- "factor"
}
switch(xclass,
"NULL" = {
stop("The column '", y, "' contains only NULL values. Remove it before pushing.")
},
# text types
"factor" = { text_type },
"character" = { text_type },
"AsIs" = { text_type },
"logical" = { text_type },
# numerical types
"numeric" = { numeric_type },
"integer" = { numeric_type },
"double" = { numeric_type },
# in case of list, get the type of the first non null value
"list" = { map_types(unlist(x, use.names = FALSE)[[1]], y) },
# data.frame
"data.frame" = {
stop("Data contains multiple levels of dataframes, which are not handled by Kibior")
},
# all others
stop("Unknown type '", xclass, "' when creating Elasticsearch mapping")
)
}
# return mapping body
purrr::imap(data, map_types) %>%
list(properties = .)
},
create_mappings = function(index_name, mapping){
"Add a mapping on an index based on given data."
""
"@param index_name a vector of index names."
"@param mapping mapping to apply."
"@return a list of indices, each containing their number of observations and variables."
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(length(index_name) > 1) stop(private$err_one_value("index_name"))
# req
if(self$verbose) message(" -> Applying mapping to '", index_name, "'")
res <- tryCatch(
expr = {
if(self$verbose) message(" - Trying without type insertion (ES > v7)... ", appendLF = FALSE)
# do not insert mapping type
r <- elastic::mapping_create(
conn = self$connection,
index = index_name,
body = mapping
)
if(self$verbose) message("ok")
r
},
error = function(e){
res <- NULL
if(self$verbose) message("nok")
if(grepl("mapping type is missing", e$message, ignore.case = TRUE)){
# older ES version < 7, try another time with mapping types
if(self$verbose) message(" - Trying with type insertion (ES < v7)... ", appendLF = FALSE)
# with type
res <- tryCatch(
expr = {
res <- elastic::mapping_create(
conn = self$connection,
index = index_name,
type = "_doc",
include_type_name = TRUE,
body = mapping
)
if(self$verbose) message("ok")
res
},
error = function(ee){
if(self$verbose) message("nok")
stop(ee$message)
}
)
} else {
stop(e$message)
}
res
}
)
#
if(self$verbose) message(" -> Waiting a bit for Elasticsearch")
Sys.sleep(self$elastic_wait)
res
},
force_merge = function(index_name, max_num_segments = NULL, only_expunge_deletes = FALSE, flush = TRUE){
"TODO desc"
if(missing(index_name)) index_name <- "*"
elastic::index_forcemerge(self$connection,
index = index_name,
max_num_segments = max_num_segments,
only_expunge_deletes = only_expunge_deletes,
flush = flush)
},
optimize = function(index_name, max_num_segments = NULL, only_expunge_deletes = FALSE, flush = TRUE, wait_for_merge = TRUE){
"TODO desc"
if(self$version$major < 5){
elastic::index_optimize(self$connection,
index = index_name,
max_num_segments = max_num_segments,
only_expunge_deletes = FALSE,
flush = flush,
wait_for_merge = wait_for_merge)
}
},
is_search_pattern = function(pattern){
"Tell if a given string vector contains an Elasticsearch pattern string"
""
"@param s the string to test"
"@return TRUE if the string is a pattern, else FALSE"
if(!purrr::is_character(pattern)) stop(private$err_param_type_character("pattern"))
grepl("*", pattern, fixed = TRUE) %>% any()
},
# --------------------------------------------------------------------------
# Format error messages
# --------------------------------------------------------------------------
err_null_forbidden = function(arg_name){
"Argument cannot be null"
paste0("`", arg_name, "` must not be NULL.")
},
err_active_is_read_only = function(param_name){
"For Active bindings - argument is read-only"
paste0("`", param_name, "` is read-only.")
},
err_param_type = function(param_name, expected_type, can_be_null = FALSE){
"Wrong param type"
s <- if(can_be_null) " or NULL" else ""
paste0("`", param_name, "` must be ", expected_type, s, ".")
},
err_param_type_numeric = function(param_name, can_be_null = FALSE){
"Wrong param type, expected numeric"
private$err_param_type(param_name, "numeric", can_be_null = can_be_null)
},
err_param_type_logical = function(param_name, can_be_null = FALSE){
"Wrong param type, expected logical"
private$err_param_type(param_name, "logical", can_be_null = can_be_null)
},
err_param_type_character = function(param_name, can_be_null = FALSE){
"Wrong param type, expected character"
private$err_param_type(param_name, "character", can_be_null = can_be_null)
},
err_param_type_list = function(param_name, can_be_null = FALSE){
"Wrong param type, expected list"
private$err_param_type(param_name, "list", can_be_null = can_be_null)
},
err_not_kibior_instance = function(param_name, can_be_null = FALSE){
"Wrong type"
private$err_param_type(param_name, "a Kibior instance.", can_be_null = can_be_null)
},
err_param_positive = function(param_name, zero_valid = FALSE){
"Wrong param type"
s <- if(zero_valid) "" else " and non-NULL"
paste0("`", param_name, "` must be positive", s, ".")
},
err_one_value = function(param_name){
"Wrong number of value in the variable, expected only one"
paste0("`", param_name, "` must have a length of 1.")
},
err_logical_na = function(param_name){
"Wrong logical NA"
paste0("`", param_name, "` must TRUE or FALSE, not NA.")
},
err_search_pattern_forbidden = function(param_name){
"Not search pattern here"
paste0("`", param_name, "` must not contain search pattern.")
},
err_index_unknown = function(index_name){
"Index unknown"
index_name %>%
private$vector_to_str() %>%
paste0("One or more indices in [", ., "] are unknown.")
},
err_index_already_exists = function(index_name){
"Index already there"
paste0("Index '", index_name, "' already exists.")
},
err_not_in_vector = function(s, vect){
"Value not in vector"
private$vector_to_str(vect) %>%
paste0(s, " must be one of [", ., "].")
},
err_empty_data = function(param_name){
"Dataset is empty"
paste0("Data in `", param_name, "` is empty.")
},
err_field_unknown = function(index_name, field_name){
"Field is unknown or not present"
paste0("Field `", field_name, "` is not present in index `", index_name, "`.")
},
err_pkg_required = function(method_name, pkg_name, repo_name){
paste0("Method `", method_name, "` requires `", pkg_name, "` (", repo_name,") package.") %>%
paste0("\nPlease, install this package and retry.")
},
# Sad error is sad.
ERR_WTF = "Well. This is sad. Report with an issue please."
),
# ============================================================================
# ACTIVE BINDING
# ============================================================================
active = list(
#' @field host Access and change the Elasticsearch host
host = function(value) {
"Access and change the Elasticsearch host"
""
"@examples"
"kc <- Kibior()"
"kc$host"
"kc$host <- 'elasticsearch' # 'elasticsearch' needs to be online, known and accessible"
"kc$host"
""
"@param value new host"
"@return A string representing the host used in connection"
if(missing(value)) {
private$.host
} else {
if(!purrr::is_character(value)) stop(private$err_param_type_character("$host"), call. = FALSE)
# change host and reconnect
private$.host <- value
private$connect()
}
},
#' @field port Access and change the Elasticsearch port
port = function(value) {
"Access and change the Elasticsearch port"
""
"@examples"
"kc <- Kibior()"
"kc$port"
"kc$port <- 15000 # the port 15000 needs to be online and accessible"
"kc$port"
""
"@param value new port"
"@return A number representing the port used in connection"
if(missing(value)) {
private$.port
} else {
if(!is.numeric(value)) stop(private$err_param_type_numeric("$port"), call. = FALSE)
# change port and reconnect
private$.port <- as.integer(value)
private$connect()
}
},
#' @field endpoint Access the Elasticsearch main endpoint
endpoint = function(value) {
"Access the Elasticsearch endpoint address."
"Not mutable, initialize another client"
""
"@examples"
"kc <- Kibior()"
"kc$endpoint"
"kc$endpoint <- 'foobar' # Error, read only"
""
"@param value not used"
"@return A string representing the Elasticsearch endpoint."
if(missing(value)) {
paste0("http://", self$host, ":", self$port)
} else {
stop(private$err_active_is_read_only("$endpoint"), call. = FALSE)
}
},
#' @field user Access the Elasticsearch user.
user = function(value) {
"Access the Elasticsearch user."
"Not mutable, initialize another client"
""
"@examples"
"kc <- Kibior()"
"kc$user"
"kc$user <- 'foobar' # Error, read only"
""
"@param value new user, not used"
"@return A string representing the user used in connection"
if(missing(value)) {
private$.user
} else {
stop(private$err_active_is_read_only("$user"), call. = FALSE)
}
},
#' @field pwd Access the Elasticsearch password.
pwd = function(value) {
"Access the Elasticsearch password."
"Not mutable, initialize another client"
""
"@examples"
"kc <- Kibior()"
"kc$pwd"
"kc$pwd <- 'foobar' # Error, read only"
""
"@param value new pwd, not used"
"@return A string representing the password used in connection"
if(missing(value)) {
private$.pwd
} else {
stop(private$err_active_is_read_only("$pwd"), call. = FALSE)
}
},
#' @field connection Access the Elasticsearch connection object.
connection = function(value) {
"Access the Elasticsearch connection object. "
"Not mutable directly, change hsot or port to modify"
""
"@examples"
"kc <- Kibior()"
"kc$connection"
"kc$connection <- 'whatever' # Error, read only"
""
"@param value new connection, not used"
"@return A `elastic::connect()` object"
if(missing(value)) {
private$.connection
} else {
stop(private$err_active_is_read_only("$connection"), call. = FALSE)
}
},
#' @field head_search_size Access and change the head size default value.
head_search_size = function(value){
"Access and change the head size default value."
""
"@examples"
"kc <- Kibior()"
"kc$head_search_size"
"kc$head_search_size <- 10"
"kc$head_search_size"
""
"@param value new head size"
"@return A number representing the number of records the `$search()` method will get as sample"
if(missing(value)) {
private$.head_search_size
} else {
if(!is.numeric(value)) stop(private$err_param_type_numeric("$head_search_size"), call. = FALSE)
private$.head_search_size <- value
}
},
#' @field cluster_name Access the cluster name if and only if already connected.
cluster_name = function(value){
"Access the cluster name if and only if already connected."
"Shortcut to `$infos()$cluster_name`"
""
"@examples"
"kc <- Kibior()"
"kc$cluster_name"
"kc$cluster_name <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A string representing the cluster name"
if(missing(value)) {
self$infos()$cluster_name
} else {
stop(private$err_active_is_read_only("$cluster_name"), call. = FALSE)
}
},
#' @field cluster_status Access the cluster status if and only if already connected.
cluster_status = function(value){
"Access the cluster status if and only if already connected."
"Shortcut to `$infos()$status`"
""
"@examples"
"kc <- Kibior()"
"kc$cluster_status"
"kc$cluster_status <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A string representing the cluster status ('green', 'yellow', or 'red')"
if(missing(value)) {
self$infos()$status
} else {
stop(private$err_active_is_read_only("$cluster_status"), call. = FALSE)
}
},
#' @field nb_documents Access the current cluster total number of documents if and only if
#' already connected.
nb_documents = function(value){
"Access the current cluster total number of documents if and only if already connected."
"Shortcut to `$infos()$indices$docs$count`"
""
"@examples"
"kc <- Kibior()"
"kc$nb_documents"
"kc$nb_documents <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A string representing the total number of recorded documents"
if(missing(value)) {
self$infos()$indices$docs$count
} else {
stop(private$err_active_is_read_only("$nb_documents"), call. = FALSE)
}
},
#' @field version Access the Elasticsearch version if and only if already connected.
version = function(value){
"Access the Elasticsearch version if and only if already connected."
""
"@examples"
"kc <- Kibior()"
"kc$version"
"kc$version <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A list with a semantic versioning (see `$major`, `$minor`, and `$patch`) of Elasticsearch"
if(missing(value)) {
private$.es_version
} else {
stop(private$err_active_is_read_only("$version"), call. = FALSE)
}
},
#' @field elastic_wait Access and change the Elasticsearch wait time for update commands if
#' and only if already connected.
elastic_wait = function(value){
"Access and change the Elasticsearch wait time for update commands if and only if already connected."
"It is sometimes needed when Elasticsearch server configuration is not optimized for data ingestion and need extra delay."
""
"@examples"
"kc <- Kibior()"
"kc$elastic_wait"
"kc$elastic_wait <- 'whatever' # Error, read only"
""
"@param value the new time to wait in second"
"@return A list with a semantic versioning (see `$major`, `$minor`, and `$patch`) of Elasticsearch"
if(missing(value)) {
private$.es_wait
} else {
if(!is.numeric(value)) stop("`$elastic_wait` must be numeric", call. = FALSE)
if(value <= 0) stop(private$err_param_positive("$elastic_wait", zero_valid = FALSE), call. = FALSE)
private$.es_wait <- value
}
},
#' @field valid_joins Access the valid joins available in Kibior.
valid_joins = function(value){
"Access the valid joins available in Kibior."
""
"@examples"
"kc <- Kibior()"
"kc$valid_joins"
"kc$valid_joins <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A vector representing possible valid joins used internally by Kibior"
if(missing(value)) {
private$.VALID_JOINS
} else {
stop(private$err_active_is_read_only("$valid_joins"), call. = FALSE)
}
},
#' @field valid_count_types Access the valid count types available (mainly observations =
#' rows, variables = columns)
valid_count_types = function(value){
"Access the valid count types available (mainly observations = rows, variables = columns)"
""
"@examples"
"kc <- Kibior()"
"kc$valid_count_types"
"kc$valid_count_types <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A vector representing possible valid count types used internally by Kibior"
if(missing(value)) {
private$.VALID_COUNT_TYPES
} else {
stop(private$err_active_is_read_only("$valid_count_types"), call. = FALSE)
}
},
#' @field valid_elastic_metadata_types Access the valid Elasticsearch metadata types
#' available.
valid_elastic_metadata_types = function(value){
"Access the valid Elasticsearch metadata types available."
""
"@examples"
"kc <- Kibior()"
"kc$valid_elastic_metadata_types"
"kc$valid_elastic_metadata_types <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A vector representing possible valid push modes"
if(missing(value)) {
private$.VALID_FEATURES
} else {
stop(private$err_active_is_read_only("$valid_elastic_metadata_types"), call. = FALSE)
}
},
#' @field valid_push_modes Access the valid push modes available.
valid_push_modes = function(value){
"Access the valid push modes available."
""
"@examples"
"kc <- Kibior()"
"kc$valid_push_modes"
"kc$valid_push_modes <- 'whatever' # Error, read only"
""
"@param value not used"
"@return A vector representing possible valid Elasticsearch metadata types"
if(missing(value)) {
private$.VALID_PUSH_MODES
} else {
stop(private$err_active_is_read_only("$valid_push_modes"), call. = FALSE)
}
},
#' @field shard_number Access and modify the number of allocated primary shards when
#' creating an Elasticsearch index.
shard_number = function(value){
"Access and modify the number of allocated primary shards when creating an Elasticsearch index."
""
"@examples"
"kc <- Kibior()"
"kc$shard_number"
"kc$shard_number <- 2"
""
"@param value number of primary shards to create"
"@return The number of primary shards defined for each index at their creation"
if(missing(value)) {
private$.es_primary_shards
} else {
if(!is.numeric(value)) stop(private$err_param_type_numeric("$shard_number"), call. = FALSE)
if(value <= 0) stop(private$err_param_positive("$shard_number", zero_valid = FALSE), call. = FALSE)
private$.es_primary_shards <- value
}
},
#' @field shard_replicas_number Access and modify the number of allocated replicas in an
#' Elasticsearch index.
shard_replicas_number = function(value){
"Access and modify the number of allocated replicas in an Elasticsearch index."
""
"@examples"
"kc <- Kibior()"
"kc$shard_replicas_number"
"kc$shard_replicas_number <- 2"
""
"@param value number of replica shards to create"
"@return The number of replicas defined for each index "
if(missing(value)) {
private$.es_replica_shards
} else {
if(!is.numeric(value)) stop(private$err_param_type_numeric("$shard_replicas_number"), call. = FALSE)
if(value < 0) stop(private$err_param_positive("$shard_replicas_number", zero_valid = TRUE), call. = FALSE)
private$.es_replica_shards <- value
}
},
#' @field default_id_col Access and modify the default ID column/field created when pushing
#' data to Elasticsearch.
default_id_col = function(value){
"Access and modify the default ID field created when pushing data to Elasticsearch."
""
"@examples"
"kc <- Kibior()"
"kc$default_id_col"
"kc$default_id_col <- 'kibior_unique_id'"
""
"@param value new name of the unique IDs field"
"@return the name of the current unique IDs field"
if(missing(value)) {
private$.default_id_col
} else {
if(!purrr::is_character(value)) stop(private$err_param_type_character("$default_id_col"), call. = FALSE)
if(length(value) > 1) stop(private$err_one_value("$default_id_col"), call. = FALSE)
private$.default_id_col <- value
}
}
),
# ============================================================================
# PUBLIC
# ============================================================================
public = list(
# --------------------------------------------------------------------------
# attributes
# --------------------------------------------------------------------------
#' @field verbose verbose mode, prints out more informations during execution
verbose = FALSE,
#' @field quiet_progress progressbar quiet mode, toggles progress bar
quiet_progress = FALSE,
#' @field quiet_results results quiet mode, toggles results printing
quiet_results = FALSE,
# --------------------------------------------------------------------------
# methods - Class inherits
# --------------------------------------------------------------------------
#' @details
#' Initialize a new object, automatically called when calling `Kibior$new()`
#'
#' @param host The target host to connect to Elasticsearch REST API (default: "localhost").
#' @param port The target port (default: 9200).
#' @param user If the server needs authentication, your username (default: NULL).
#' @param pwd If the server needs authentication, your password (default: NULL).
#' @param verbose The verbose mode (default: FALSE).
#'
#' @return a new instance/object of Kibior
#'
#' @examples
#' \dontrun{
#' # default initiatlization, connect to "localhost:9200"
#' kc <- Kibior$new()
#' # connect to "192.168.2.145:9200"
#' kc <- Kibior$new("192.168.2.145")
#' # connect to "es:15005", verbose mode activated
#' kc <- Kibior$new(host = "elasticsearch", port = 15005, verbose = TRUE)
#' # connect to "192.168.2.145:9450" with credentials "foo:bar"
#' kc <- Kibior$new(host = "192.168.2.145", port = 9450, user = "foo", pwd = "bar")
#' # connect to "elasticsearch:9200"
#' kc <- Kibior$new("elasticsearch")
#'
#' # get kibior var from env (".Renviron" file or local env)
#' dd <- system.file("doc_env", "kibior_build.R", package = "kibior")
#' source(dd, local = TRUE)
#' kc <- .kibior_get_instance_from_env()
#' kc$quiet_progress <- TRUE
#'
#' # preparing all examples (do not mind this for this method)
#' delete_if_exists <- function(index_names){
#' tryCatch(
#' expr = { kc$delete(index_names) },
#' error = function(e){ }
#' )
#' }
#' delete_if_exists(c(
#' "aaa",
#' "bbb",
#' "ccc",
#' "ddd",
#' "sw",
#' "sw_naboo",
#' "sw_tatooine",
#' "sw_alderaan",
#' "sw_from_file",
#' "storms",
#' "starwars"
#' ))
#' }
#'
initialize = function(host = "localhost", port = 9200, user = NULL, pwd = NULL, verbose = getOption("verbose")){
if(purrr::is_null(host)) host <- "localhost"
if(!is.numeric(port)) stop(private$err_param_type_numeric("port"))
if(!purrr::is_character(host)) stop(private$err_param_type_character("host"))
if(!purrr::is_null(user)){
if(!purrr::is_character(user)) stop(private$err_param_type_character("user"))
if(length(user) != 1) stop(private$err_one_value("user"))
}
if(!purrr::is_null(pwd)){
if(!purrr::is_character(pwd)) stop(private$err_param_type_character("pwd"))
if(length(pwd) != 1) stop(private$err_one_value("pwd"))
}
if(!purrr::is_logical(verbose)) stop(private$err_param_type_logical("verbose"))
if(!(verbose %in% c(FALSE, TRUE))) stop(private$err_logical_na("verbose"))
# cast
port <- as.integer(port)
# build attr
self$verbose <- verbose
private$.host <- host
private$.port <- port
private$.user <- user
private$.pwd <- pwd
# try to connect
private$connect()
},
#' @details
#' Print simple informations of the current object.
#'
#' @examples
#' \dontrun{
#' print(kc)
#' }
#'
print = function(){
f <- function(x) if(x) "yes" else "no"
cat("KibioR client: \n")
cat(" - host:", private$.host, "\n")
cat(" - port:", private$.port, "\n")
if(!purrr::is_null(private$.user)){
cat(" - username:", private$.user, "\n")
}
cat(" - verbose:", f(self$verbose), "\n")
cat(" - print result:", f(!self$quiet_results), "\n")
cat(" - print progressbar:", f(!self$quiet_progress), "\n")
},
#'
#' @details
#' Tells if another instance of Kibior has the same `host:port` couple.
#'
#' @param other Another instance/object of Kibior (default: NULL).
#'
#' @return TRUE if hosts and ports are identical, else FALSE
#'
#' @examples
#' \dontrun{
#' kc$eq(kc)
#' }
#'
eq = function(other = NULL){
if(!Kibior$is_instance(other)) stop(private$err_not_kibior_instance("other"))
r <- (self$host == other$host && self$port == other$port)
if(self$quiet_results) invisible(r) else r
},
#'
#' @details
#' Tells if another instance of Kibior has a different `host:port` couple.
#'
#' @param other Another instance/object of Kibior (default: NULL).
#'
#' @return TRUE if hosts and ports are differents, else FALSE
#'
#' @examples
#' \dontrun{
#' kc$ne(kc)
#' }
#'
ne = function(other = NULL){
r <- !self$eq(other = other)
if(self$quiet_results) invisible(r) else r
},
# --------------------------------------------------------------------------
# methods - CRUD index
# --------------------------------------------------------------------------
#' @details
#' Create one or several indices in Elasticsearch.
#'
#' @family crud-index
#'
#' @param index_name a vector of index names to create (default: NULL).
#' @param force Erase already existing identical index names? (default: FALSE).
#'
#' @return a list containing results of creation per index
#'
#' @examples
#' \dontrun{
#' kc$create("aaa")
#' kc$create(c("bbb", "ccc"))
#' }
#'
create = function(index_name, force = FALSE){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(private$is_search_pattern(index_name)) stop(private$err_search_pattern_forbidden("index_name"))
if(!purrr::is_logical(force)) stop(private$err_param_type_logical("force"))
if(is.na(force)) stop(private$err_logical_na("force"))
#
res <- list()
complete <- list()
f <- if(force) elastic::index_recreate else elastic::index_create
fnote <- if(force) "Recreating" else "Creating"
for(i in index_name){
if(self$verbose) message(" -> ", fnote, " index '", i, "'")
# build with settings but mapping is defined with data to better match fields type
body <- list(
"settings" = list(
"index" = list(
"number_of_shards" = private$.es_primary_shards,
"number_of_replicas" = private$.es_replica_shards
)
)
)
# execute
complete[[i]] <- tryCatch(
expr = {
f(self$connection, index = i, body = body, verbose = FALSE)
},
error = function(e){
if(grepl("already exists", e$message, ignore.case = TRUE)){
list(acknowledged = FALSE)
} else {
stop(e$message)
}
}
)
# control
if(complete[[i]]$acknowledged){
private$force_merge("*")
private$optimize(i)
res[[i]] <- TRUE
} else {
res[[i]] <- FALSE
}
}
# wait if at least one created
if(any(unlist(res, use.names = FALSE))){
if(self$verbose) message(" -> Waiting a bit for Elasticsearch")
Sys.sleep(self$elastic_wait)
}
if(self$quiet_results) invisible(res) else res
},
#' @details
#' List indices in Elasticsearch.
#'
#' @family crud-index
#'
#' @examples
#' \dontrun{
#' kc$list()
#' kc$list(get_specials = TRUE)
#' }
#'
#' @param get_specials a boolean to get special indices (default: FALSE).
#'
#' @return a list of index names, NULL if no index found
#'
list = function(get_specials = FALSE){
r <- names(self$mappings())
if(!purrr::is_null(r) && !get_specials){
# remove special indices
r <- r[!startsWith(r, ".")]
r <- r[!startsWith(r, "watcher_")]
}
if(self$quiet_results) invisible(r) else r
},
#' @details
#' Does Elasticsearch has one or several indices?
#'
#' @family crud-index
#'
#' @examples
#' \dontrun{
#' kc$has("aaa")
#' kc$has(c("bbb", "ccc"))
#' }
#'
#' @param index_name a vector of index names to check.
#'
#' @return a list with TRUE for found index, else FALSE
#'
has = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(private$is_search_pattern(index_name)) stop(private$err_search_pattern_forbidden("index_name"))
# result
r <- self$list()
res <- vapply(index_name,
FUN = function(x){ x %in% r },
FUN.VALUE = logical(1)) %>%
as.list()
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Delete one or several indices in Elasticsearch.
#'
#' @family crud-index
#'
#' @param index_name a vector of index names to delete.
#'
#' @return a list containing results of deletion per index, or NULL if no index name match
#'
#' @examples
#' \dontrun{
#' kc$delete("aaa")
#' kc$delete(c("bbb", "ccc"))
#' }
#'
delete = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if("_all" %in% index_name) stop("Cannot delete all indices at once.")
if("*" %in% index_name) stop("Cannot delete all indices at once.")
# match index names
indices <- self$match(index_name)
if(purrr::is_null(indices)){
if(self$verbose) message(" -> Deleting no index... well, ok")
res <- NULL
} else {
#
res <- list()
complete <- list()
if(self$verbose) message(" -> Deleting")
for(i in indices){
# if(self$verbose) message(" -> Deleting index '", i, "'... ", appendLF = FALSE)
if(self$verbose) message(" - Index '", i, "'... ", appendLF = FALSE)
# delete
complete[[i]] <- tryCatch(
expr = {
r <- elastic::index_delete(self$connection, index = i, verbose = FALSE)
if(self$verbose) message("ok")
r
},
error = function(e){
if(self$verbose) message("nok")
if(grepl("no such index", e$message, ignore.case = TRUE)){
if(self$verbose) message(" -> Index '", i, "' does not exists")
r <- list(acknowledged = FALSE)
} else {
stop(e$message)
}
r
}
)
#
if(complete[[i]]$acknowledged){
private$force_merge("*")
private$optimize(i)
res[[i]] <- TRUE
} else {
res[[i]] <- FALSE
}
}
# wait if at least one deleted
if(any(unlist(res, use.names = FALSE))){
if(self$verbose) message(" -> Waiting a bit for Elasticsearch")
Sys.sleep(self$elastic_wait)
}
}
#
if(self$quiet_results) invisible(res) else res
},
# --------------------------------------------------------------------------
# methods - indices metadata CRUD
# --------------------------------------------------------------------------
#' @details
#' Add a description of a pushed dataset.
#'
#' @family kibior-metadata
#'
#' @param index_name the index name to describe
#' @param dataset_name the full length dataset name
#' @param source_name the source/website/entity full length name
#' @param index_description the index description, should be explicit
#' @param version the version of the source dataset
#' @param change_log what have been done from the last version
#' @param website the website to the source dataset website
#' @param direct_download the direct download url of the dataset source
#' @param version_date the version or build date
#' @param license the license attached to this dataset, could be a url
#' @param contact a mailto/contact
#' @param references some paper and other references (e.g. doi, url)
#' @param columns a list of (column_name = column_description) to register (default: list())
#' @param force if FALSE, raise an error if the description already exists, else TRUE to
#' overwrite (default: FALSE)
#'
#' @return the index name if complete, else an error
#'
#' @examples
#' \dontrun{
#' kc$add_description(
#' index_name = "sw",
#' dataset_name = "starwars",
#' source_name = "Package dplyr",
#' index_description = "Description of starwars characters, the data comes from the Star
#' Wars API.",
#' version = "dplyr (1.0.0)",
#' link = "http://swapi.dev/",
#' direct_download_link = "http://swapi.dev/",
#' version_date = "2020-05-28",
#' license_link = "MIT",
#' columns = list(
#' "name" = "Name of the character",
#' "height" = "Height (cm)",
#' "mass" = "Weight (kg)",
#' "hair_color" = "Hair colors",
#' "skin_color" = "Skin colors",
#' "eye_color" = "Eye colors",
#' "birth_year" = "Year born (BBY = Before Battle of Yavin)",
#' "sex" = "The biological sex of the character, namely male, female,
#' hermaphroditic, or none (as in the case for Droids).",
#' "gender" = "The gender role or gender identity of the character as determined by
#' their personality or the way they were progammed (as in the case for Droids
#' ).",
#' "homeworld" = "Name of homeworld",
#' "species" = "Name of species",
#' "films" = "List of films the character appeared in",
#' "vehicles" = "List of vehicles the character has piloted",
#' "starships" = "List of starships the character has piloted"
#' )
#' )
#' }
#'
add_description = function(index_name, dataset_name, source_name, index_description, version, change_log, website, direct_download, version_date, license, contact, references, columns = list(), force = FALSE){
# check
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(private$is_search_pattern(index_name)) stop(private$err_search_pattern_forbidden("index_name"))
if(length(index_name) > 1) stop(private$err_one_value("index_name"))
if(suppressMessages({ purrr::is_null(self$match(index_name)) })) stop("push an index before describing it please")
if(!purrr::is_character(dataset_name)) stop(private$err_param_type_character("dataset_name"))
if(length(dataset_name) > 1) stop(private$err_one_value("dataset_name"))
if(!purrr::is_character(source_name)) stop(private$err_param_type_character("source_name"))
if(length(source_name) > 1) stop(private$err_one_value("source_name"))
if(!purrr::is_character(index_description)) stop(private$err_param_type_character("index_description"))
if(length(index_description) > 1) stop(private$err_one_value("index_description"))
if(!purrr::is_character(version)) stop(private$err_param_type_character("version"))
if(length(version) > 1) stop(private$err_one_value("version"))
if(!purrr::is_character(change_log)) stop(private$err_param_type_character("change_log"))
if(length(change_log) > 1) stop(private$err_one_value("change_log"))
if(!purrr::is_character(website)) stop(private$err_param_type_character("website"))
if(length(website) > 1) stop(private$err_one_value("website"))
if(!purrr::is_character(direct_download)) stop(private$err_param_type_character("direct_download"))
if(length(direct_download) > 1) stop(private$err_one_value("direct_download"))
if(!purrr::is_character(version_date)) stop(private$err_param_type_character("version_date"))
if(length(version_date) > 1) stop(private$err_one_value("version_date"))
if(!purrr::is_character(license)) stop(private$err_param_type_character("license"))
if(length(license) > 1) stop(private$err_one_value("license"))
if(!purrr::is_character(contact)) stop(private$err_param_type_character("contact"))
if(!purrr::is_character(references)) stop(private$err_param_type_character("references"))
if(!purrr::is_logical(force)) stop(private$err_param_type_logical("force"))
if(is.na(force)) stop(private$err_logical_na("force"))
# check cols
if(!purrr::is_list(columns) || private$is_list_empty(columns)){
stop("argument 'columns' must be a list, with the column name as key and column description as value")
}
actual_cols <- suppressMessages({ self$columns(index_name)[[index_name]] }) %>%
(function(x) x[x != self$default_id_col])
if(length(actual_cols) != length(columns)){
stop("argument 'columns' must have the same number of columns as in its index. It does not take '", self$default_id_col, "' into account.")
}
sd <- private$symmetric_difference(actual_cols, names(columns))
if(!purrr::is_null(sd)){
sd %>%
private$vector_to_str() %>%
stop("argument 'columns' has some undocumented or unknown column names ", .)
}
if(length(unlist(columns, use.names = FALSE)) != length(columns)) stop("argument 'columns' must not accept list type in values")
has_non_str_type <- columns %>%
lapply(typeof) %>%
unlist(use.names = FALSE) %>%
{. != "character"} %>%
any()
if(has_non_str_type) stop("argument 'columns' must only have character type in values")
# get actual data
if(self$verbose) message(" -> Getting actual descriptions")
data <- suppressMessages({
self$pull(private$.KIBIOR_METADATA_INDICES)[[private$.KIBIOR_METADATA_INDICES]]
})
if(index_name %in% data$index_name){
if(!force){
stop("index '", index_name ,"' description is already registered. Use 'force' to overwrite.")
} else {
if(self$verbose) message(" -> Cleaning old description of '", index_name, "'")
data <- dplyr::filter(data, index_name != index_name)
}
}
#
if(self$verbose) message(" -> Formatting new description for '", index_name, "'")
meta_index <- list(
index_name = index_name,
dataset_name = dataset_name,
source_name = source_name,
index_description = index_description,
version = version,
change_log = change_log,
website = website,
direct_download = direct_download,
version_date = version_date,
license = license,
contact = contact,
references = references
) %>% as.data.frame(stringsAsFactors = FALSE)
#
new_data <- columns %>%
purrr::imap(function(col_desc, col_name){
list(
index_name = index_name,
column_name = col_name,
column_description = col_desc
)
}) %>%
data.table::rbindlist() %>%
as.data.frame(stringsAsFactors = FALSE) %>%
dplyr::right_join(meta_index, by = "index_name")
#
if(self$verbose) message(" -> Pushing new description for '", index_name, "'")
suppressMessages({
data %>%
(function(d){
if(purrr::is_null(d)){
new_data
} else {
d %>%
dplyr::select(-c(self$default_id_col)) %>%
rbind(new_data)
}
}) %>%
self$push(private$.KIBIOR_METADATA_INDICES, mode = "recreate")
})
#
if(self$quiet_results) invisible(index_name) else index_name
},
#' @details
#' Does the description exists?
#'
#' @family kibior-metadata
#'
#' @param index_name the index name to describe
#'
#' @return a list splitted by index, with TRUE if the description is
#' found, else FALSE. Removes unknown index names.
#'
#' @examples
#' \dontrun{
#' kc$has_description("s*")
#' kc$has_description(c("sw", "asdf"))
#' }
#'
has_description = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(private$is_search_pattern(index_name)) stop(private$err_search_pattern_forbidden("index_name"))
if(length(index_name) > 1) stop(private$err_one_value("index_name"))
#
i <- suppressMessages({ self$match(index_name) })
if(purrr::is_null(i)) stop("No index found with these names or pattern.")
n <- private$.KIBIOR_METADATA_INDICES
tmp <- suppressMessages({ self$keys(n, "index_name") })
res <- i %>% lapply(function(d){ d %in% tmp })
names(res) <- i
if(self$quiet_results) invisible(index_name) else index_name
},
#' @details
#' List indices that do no have descriptions.
#'
#' @family kibior-metadata
#'
#' @return a vector of indices not present in description.
#'
#' @examples
#' \dontrun{
#' kc$missing_descriptions()
#' }
#'
missing_descriptions = function(){
tmp <- suppressMessages({ self$list() })
indices <- suppressMessages({ self$keys(private$.KIBIOR_METADATA_INDICES, "index_name") })
res <- tmp[!tmp %in% indices]
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Remove a description.
#'
#' @family kibior-metadata
#'
#' @param index_name the index name to describe
#'
#' @return a vector of indices not present in description.
#'
#' @examples
#' \dontrun{
#' # remove the description of 'test' index
#' kc$remove_description("test")
#' }
#'
remove_description = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(private$is_search_pattern(index_name)) stop(private$err_search_pattern_forbidden("index_name"))
if(length(index_name) > 1) stop(private$err_one_value("index_name"))
#
n <- private$.KIBIOR_METADATA_INDICES
is_describe <- suppressMessages({ self$keys(n, "index_name") }) %>% { index_name %in% . }
if(!is_describe){
stop("No description found for index '", index_name, "'.")
}
tmp <- suppressMessages({ self$pull(n)[[n]] }) %>%
dplyr::filter(index_name != !!index_name)
res <- tryCatch(
expr = {
tmp %>% dplyr::select(-c(self$default_id_col))
},
error = function(e){
message(e)
tmp
}
)
suppressMessages({ self$push(res, n, mode = "recreate") })
if(self$quiet_results) invisible(index_name) else index_name
},
#' @details
#' Remove all descriptions that do not have a index associated.
#'
#' @family kibior-metadata
#'
#' @return a list of index names which have been removed from descriptions.
#'
#' @examples
#' \dontrun{
#' # remove the description of 'test' index
#' kc$clean_descriptions()
#' }
#'
clean_descriptions = function(){
# get all description entries that do not have index associated
tmp <- suppressMessages({ self$keys(private$.KIBIOR_METADATA_INDICES, "index_name") })
description_names <- suppressMessages({ tmp[!tmp %in% self$list()] })
if(length(description_names) == 0){
if(self$verbose) message("No obsolete description found.")
res <- NULL
} else {
if(self$verbose) message("Removing obsolete index descriptions: ", private$vector_to_str(description_names))
res <- description_names %>%
lapply(self$remove_description) %>%
unlist(use.names = FALSE)
}
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Get the description of indices and their columns.
#'
#' @family kibior-metadata
#'
#' @param index_name the index name to describe
#' @param columns a vector of column names to describe (default: NULL)
#' @param pretty pretty-print the result (default: FALSE)
#'
#' @return all description, grouped by indices
#'
#' @examples
#' \dontrun{
#' kc$describe("s*")
#' kc$describe("sw", columns = c("name", "height"))
#' }
#'
describe = function(index_name, columns = NULL, pretty = FALSE){
# check
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(!purrr::is_null(columns) && !purrr::is_character(columns)){
stop(private$err_param_type_character("columns", can_be_null = TRUE))
}
if(!purrr::is_logical(pretty)) stop(private$err_param_type_logical("pretty"))
if(is.na(pretty)) stop(private$err_logical_na("pretty"))
# check if there is any description
m <- suppressMessages({ self$match(index_name) })
if(purrr::is_null(m)){
if(self$verbose) message(" -> No description found for '", index_name,"'")
res <- NULL
} else {
# query
n <- private$.KIBIOR_METADATA_INDICES
q <- paste0("index_name:", index_name)
if(!purrr::is_null(columns)){
q <- columns %>%
paste0(collapse = " || ") %>%
paste0(q, " && column_name:(", ., ")")
}
if(self$verbose) message(" -> Getting actual description for '", index_name, "'")
tmp <- suppressMessages({ self$pull(n, query = q)[[n]] })
if(purrr::is_null(tmp)){
res <- NULL
} else {
tmp <- tryCatch(
expr = {
tmp %>% dplyr::select(-c(self$default_id_col))
},
error = function(e){
if(grepl("doesn't handle lists", e$message, ignore.case = TRUE)){
if(self$verbose) message(" -> Skipping. No column name matching.")
NULL
} else {
stop(e$message)
}
}
)
if(purrr::is_null(tmp)){
res <- NULL
} else {
indices <- unique(tmp$index_name)
res <- indices %>%
lapply(function(n){
tmp %>%
dplyr::filter(index_name == n) %>%
dplyr::select(-c("index_name"))
})
names(res) <- indices
}
}
}
# pretty print
if(pretty && !purrr::is_null(res)){
pretty_format <- function(d,i){
res <- paste0("\nIndex '", i, "':\n") %>%
paste0(" - dataset name: ", unique(d$dataset_name), "\n") %>%
paste0(" - source name: ", unique(d$source_name), "\n") %>%
paste0(" - version: ", unique(d$version), "\n") %>%
paste0(" - website: ", unique(d$website), "\n") %>%
paste0(" - direct download: ", unique(d$direct_download), "\n") %>%
paste0(" - version date: ", unique(d$version_date), "\n") %>%
paste0(" - license: ", unique(d$license), "\n") %>%
paste0(" - contact: ", unique(d$contact), "\n") %>%
paste0(" - index description: ", unique(d$index_description), "\n") %>%
paste0(" - change log: ", unique(d$change_log), "\n")
# ref
references <- d$references %>%
unique() %>%
lapply(function(x) paste0(" - ", x)) %>%
paste0(collapse = "\n") %>%
paste0(" - references:\n", ., "\n")
# cols
columns <- d %>%
dplyr::select(c("column_name", "column_description")) %>%
unique() %>%
apply(1, function(x) paste0(" - ", x[1], ": ", x[2])) %>%
paste0(collapse = "\n") %>%
paste0(" - columns:\n", ., "\n")
# combine
res %>%
paste0(references) %>%
paste0(columns)
}
# print
res %>%
purrr::imap(pretty_format) %>%
paste0(collapse = "\n\n") %>%
cat()
res <- NULL
}
res
},
#' @details
#' Get the description text of indices.
#'
#' @family kibior-metadata
#'
#' @param index_name the index name to describe
#'
#' @return a list of description text, grouped by indices
#'
#' @examples
#' \dontrun{
#' kc$describe_index("s*")
#' }
#'
describe_index = function(index_name){
res <- self$describe(index_name)
if(!purrr::is_null(res)){
res <- res %>%
purrr::imap(function(d,i) d$index_description) %>%
purrr::imap(function(d,i) unique(d))
}
res
},
#' @details
#' Get the description text of index columns.
#'
#' @family kibior-metadata
#'
#' @param index_name the index name to describe
#' @param columns a vector of column names to describe
#'
#' @return a list of description text, grouped by indices
#'
#' @examples
#' \dontrun{
#' kc$describe_columns("s*", c("name", "height"))
#' }
#'
describe_columns = function(index_name, columns){
res <- self$describe(index_name, columns = columns)
if(!purrr::is_null(res)){
res <- res %>%
purrr::imap(function(d,i) dplyr::select(d, c("column_name", "column_description"))) %>%
purrr::imap(function(d,i) unique(d))
}
res
},
# --------------------------------------------------------------------------
# methods - cluster wealth
# --------------------------------------------------------------------------
#' @details
#' Get informations about Elasticsearch cluster
#'
#' @family cluster-wealth
#'
#' @examples
#' \dontrun{
#' kc$infos()
#' }
#'
#' @return a list of statistics about the cluster
#'
infos = function(){
suppressMessages({
r <- elastic::cluster_stats(self$connection)
})
if(self$quiet_results) invisible(r) else r
},
#' @details
#' Ping cluster connection
#'
#' @family cluster-wealth
#'
#' @examples
#' \dontrun{
#' kc$ping()
#' }
#'
#' @return the ping result with some basic infos
#'
ping = function(){
r <- self$connection$ping()
if(self$quiet_results) invisible(r) else r
},
# --------------------------------------------------------------------------
# methods - CRUD metadata
# --------------------------------------------------------------------------
#' @details
#' Get mappings of indices
#'
#' @family crud-metadata
#'
#' @examples
#' \dontrun{
#' kc$mappings()
#' kc$mappings("sw")
#' kc$mappings(c("sw", "sw_naboo"))
#' }
#'
#' @param index_name a vector of index names to get mappings.
#'
#' @return the list of indices, containing their mapping
#'
mappings = function(index_name){
if(missing(index_name)) index_name <- "*"
res <- private$metadata_type("mappings", index_name)
if(!purrr::is_null(res)){
# remove type call
if(self$version$major < 7){
res <- res %>% lapply(function(x){
if(purrr::is_null(x) || length(x) == 0) x else x[[1]]
})
}
}
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Get settings of indices
#'
#' @family crud-metadata
#'
#' @examples
#' \dontrun{
#' kc$settings()
#' kc$settings("sw")
#' kc$settings(c("sw", "sw_tatooine"))
#' }
#'
#' @param index_name a vector of index names to get settings.
#'
#' @return the list of indices, containing their settings
#'
settings = function(index_name){
if(missing(index_name)) index_name <- NULL
res <- private$metadata_type("settings", index_name)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Get aliases of indices
#'
#' @family crud-metadata
#'
#' @examples
#' \dontrun{
#' kc$aliases()
#' kc$aliases("sw")
#' kc$aliases(c("sw", "sw_alderaan"))
#' }
#'
#' @param index_name a vector of index names to get aliases.
#'
#' @return the list of indices, containing their aliases
#'
aliases = function(index_name){
if(missing(index_name)) index_name <- NULL
res <- private$metadata_type("aliases", index_name)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Shortcut to `$count()` to match the classical `dim()` function pattern `[line col]`
#'
#' @family crud-metadata
#'
#' @examples
#' \dontrun{
#' # Couple [<nb obs> <nb var>] in "sw"
#' kc$dim("sw")
#' # Couple [<nb obs> <nb var>] in indices "sw_naboo" and "sw_alderaan"
#' kc$dim(c("sw_naboo", "sw_alderaan"))
#' }
#'
#' @param index_name a vector of index names to get aliases.
#'
#' @return the list of indices, containing their number of observations and variables.
#'
dim = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
#
involved_indices <- self$match(index_name)
#
res <- involved_indices %>%
lapply(function(x){
c(
suppressMessages(self$count(x, type = "observations")[[x]]),
suppressMessages(self$count(x, type = "variables")[[x]])
)
}) %>%
setNames(involved_indices)
#
if(length(res) == 0) res <- NULL
#
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Get fields/columns of indices.
#'
#' @family crud-metadata
#'
#' @examples
#' \dontrun{
#' kc$columns("sw") # direct search
#' kc$columns("sw_*") # pattern search
#' }
#'
#' @param index_name a vector of index names, can be a pattern.
#'
#' @return a list of indices, each containing their fields/columns.
#'
columns = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
#
involved_indices <- self$match(index_name)
#
res <- NULL
if(!purrr::is_null(involved_indices)){
selected_mapping <- self$mappings(index_name)
#
get_property_names <- function(index_name){
# try index name
r <- selected_mapping[[index_name]]$properties
# if main selected_mapping is empty, maybe in "_doc"
if(purrr::is_null(r)){
r <- selected_mapping[["_doc"]]$properties
}
names(r)
}
#
res <- involved_indices %>%
lapply(get_property_names) %>%
setNames(involved_indices)
}
#
if(self$quiet_results) invisible(res) else res
},
# --------------------------------------------------------------------------
# methods - Stats
# --------------------------------------------------------------------------
#' @details
#' Count observations or variables in Elasticsearch data
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # Number of observations (nb of records) in "sw"
#' kc$count("sw")
#' # Number of observations in indices "sw_naboo" and "sw_tatooine"
#' kc$count(c("sw_naboo", "sw_tatooine"))
#' # Number of variables (nb of columns) in index "sw_naboo"
#' kc$count("sw_naboo", type = "variables")
#' }
#'
#' @param index_name a vector of index names to get aliases.
#' @param type a string representing the type to count: "observations" (lines) or
#' "variables" (columns) (default: "observations").
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return the list of indices, containing their number of observations or variables.
#' Use `$dim()` for both
#'
count = function(index_name, type = "observations", query = NULL){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(length(type) > 1) stop(private$err_one_value("type"))
if(!(type %in% private$.VALID_COUNT_TYPES)) stop(private$err_not_in_vector("Count type", private$.VALID_COUNT_TYPES))
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
#
involved_indices <- self$match(index_name)
#
res <- list()
if(!purrr::is_null(involved_indices)){
for(i in involved_indices){
res[[i]] <- switch(type,
"observations" = {
elastic::count(self$connection, index = i, q = query)
},
"variables" = {
suppressMessages(self$columns(i)[[i]]) %>% length()
},
stop(private$ERR_WTF, " Found type: ", type)
)
}
}
if(length(res) == 0){
res <- NULL
}
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Get the average of numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # Avg of "sw" column "height"
#' kc$avg("sw", "height")
#' # if pattern
#' kc$avg("s*", "height")
#' # multiple indices, multiple columns
#' kc$avg(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a tibble with avg, one line by matching index and column.
#'
avg = function(index_name, columns, query = NULL){
private$single_value_metric_aggregation("avg", index_name, columns, query = query)
},
#' @details
#' Get the mean of numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # mean of "sw" column "height"
#' kc$mean("sw", "height")
#' # if pattern
#' kc$mean("s*", "height")
#' # multiple indices, multiple columns
#' kc$mean(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a tibble with mean, one line by matching index and column.
#'
mean = function(index_name, columns, query = NULL){
args <- list(index_name, columns, query = query)
mini <- do.call(self$min, args)
maxi <- do.call(self$max, args)
# for each index
purrr::imap(mini, function(x,y){
# add maxi to mini
tmp <- dplyr::inner_join(x, maxi[[y]], by = c("column"))
# compute mean
tmp$mean <- ((tmp$min + tmp$max)/2)
tmp[c("column", "mean")]
})
},
#' @details
#' Get the minimum of numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # min of "sw" column "height"
#' kc$min("sw", "height")
#' # if pattern
#' kc$min("s*", "height")
#' # multiple indices, multiple columns
#' kc$min(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a tibble with min, one line by matching index and column.
#'
min = function(index_name, columns, query = NULL){
private$single_value_metric_aggregation("min", index_name, columns, query = query)
},
#' @details
#' Get the maximum of numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # max of "sw" column "height"
#' kc$max("sw", "height")
#' # if pattern
#' kc$max("s*", "height")
#' # multiple indices, multiple columns
#' kc$max(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a tibble with max, one line by matching index and column.
#'
max = function(index_name, columns, query = NULL){
private$single_value_metric_aggregation("max", index_name, columns, query = query)
},
#' @details
#' Get the sum of numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # sum of "sw" column "height"
#' kc$sum("sw", "height")
#' # if pattern
#' kc$sum("s*", "height")
#' # multiple indices, multiple columns
#' kc$sum(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a tibble with sum, one line by matching index and column.
#'
sum = function(index_name, columns, query = NULL){
private$single_value_metric_aggregation("sum", index_name, columns, query = query)
},
#' @details
#' Produces descriptive statistics of a column.
#' Returns a tibble composed of: count, min, max, avg, sum,
#' sum_of_squares, variance, std_deviation (+ upper and lower bounds).
#' Multiple warnings here. One for the count and one for the std_dev.
#' 1/ Counts: they are approximate, see vignette.
#' 2/ Std dev: as stated in ES documentation: "The standard deviation
#' and its bounds are displayed by default, but they are not always
#' applicable to all data-sets. Your data must be normally distributed
#' for the metrics to make sense. The statistics behind standard
#' deviations assumes normally distributed data, so if your data is
#' skewed heavily left or right, the value returned will be misleading."
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # Stats of "sw" column "height"
#' kc$stats("sw", "height")
#' # if pattern
#' kc$stats("s*", "height")
#' # multiple indices and sigma definition
#' kc$stats(c("sw", "sw2"), "height", sigma = 2.5)
#' # multiple indices, multiple columns
#' kc$stats(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param sigma (default: NULL).
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a tibble with descriptive stats, one line by matching index.
#'
#' @seealso you should use \code{\link{count}} for more accurate count.
#'
stats = function(index_name, columns, sigma = NULL, query = NULL){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(!purrr::is_character(columns)) stop(private$err_param_type_character("columns"))
if(!purrr::is_null(sigma)){
if(length(sigma) > 1) stop(private$err_one_value("sigma"))
if(!is.numeric(sigma)) stop(private$err_param_type_numeric("sigma"))
if(sigma <= 0) stop(private$err_param_positive("sigma", zero_valid = FALSE))
if(self$verbose) message(" -> Using sigma: ", sigma)
}
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
#
test_function <- function(e){ e$count == 0 }
supp_args <- if(purrr::is_null(sigma)) "" else paste0(',"sigma":', sigma)
#
private$multi_value_metric_aggregation("extended_stats", index_name, columns,
function_test_null_result = test_function,
aggregation_supplementary_args = supp_args,
query = query)
},
#' @details
#' Get percentiles of numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # percentiles of "sw" column "height", default is with q1, q2 and q3
#' kc$percentiles("sw", "height")
#' # if pattern
#' kc$percentiles("s*", "height")
#' # defining percents to get
#' kc$percentiles("s*", "height", percents = c(20, 25))
#' # multiple indices, multiple columns
#' kc$percentiles(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param percents a numeric vector of pecents to use (default: NULL).
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a list of tibble, splitted by indices with percentiles inside tibble columns.
#'
# https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-metrics-percentile-aggregation.html#search-aggregations-metrics-percentile-aggregation-approximation
percentiles = function(index_name, columns, percents = NULL, query = NULL){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(!purrr::is_character(columns)) stop(private$err_param_type_character("columns"))
if(!purrr::is_null(percents)){
if(!is.numeric(percents)) stop(private$err_param_type_numeric("percents"))
if(any(percents <= 0)) stop(private$err_param_positive("percents", zero_valid = FALSE))
if(any(percents > 100)) stop("percents cannot be > 100")
} else {
percents <- c(25, 50, 75)
}
if(self$verbose){
percents %>%
private$vector_to_str() %>%
message(" -> Using percentiles: ", .)
}
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
#
test_function <- function(e){
e %>%
lapply(function(m) purrr::is_null(m)) %>%
unlist(use.names = FALSE) %>%
any()
}
supp_args <- percents %>% paste0(collapse = ",") %>% paste0(',"percents":[', ., ']')
#
private$multi_value_metric_aggregation("percentiles", index_name, columns,
function_test_null_result = test_function,
aggregation_supplementary_args = supp_args,
query = query)
},
#' @details
#' Get Q1 percentiles from numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # Q1 of "sw" column "height"
#' kc$q1("sw", "height")
#' # if pattern
#' kc$q1("s*", "height")
#' # multiple indices, multiple columns
#' kc$q1(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a list of tibble, splitted by indices with Q1 inside tibble columns.
#'
q1 = function(index_name, columns, query = NULL){
self$percentiles(index_name, columns, percents = 25, query = query)
},
#' @details
#' Get Q2 percentiles from numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # Q2 of "sw" column "height"
#' kc$q2("sw", "height")
#' # if pattern
#' kc$q2("s*", "height")
#' # multiple indices, multiple columns
#' kc$q2(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a list of tibble, splitted by indices with Q2 inside tibble columns.
#'
q2 = function(index_name, columns, query = NULL){
self$percentiles(index_name, columns, percents = 50, query = query)
},
#' @details
#' Get median from numeric columns.
#' Basically a wrapper around `$q2()`.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # median of "sw" column "height"
#' kc$median("sw", "height")
#' # if pattern
#' kc$median("s*", "height")
#' # multiple indices, multiple columns
#' kc$median(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a list of tibble, splitted by indices with median inside tibble columns.
#'
median = function(index_name, columns, query = NULL){
self$q2(index_name, columns, query = query)
},
#' @details
#' Get Q3 percentiles from numeric columns.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # Q3 of "sw" column "height"
#' kc$q3("sw", "height")
#' # if pattern
#' kc$q3("s*", "height")
#' # multiple indices, multiple columns
#' kc$q3(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a list of tibble, splitted by indices with Q3 inside tibble columns.
#'
q3 = function(index_name, columns, query = NULL){
self$percentiles(index_name, columns, percents = 75, query = query)
},
#' @details
#' Summary for numeric columns.
#' Cumulates `$min()`, `$max()`, `$q1()`, `$q2()`, `$q3()`.
#'
#' @family stats
#'
#' @examples
#' \dontrun{
#' # summary of "sw" column "height"
#' kc$summary("sw", "height")
#' # if pattern
#' kc$summary("s*", "height")
#' # multiple indices, multiple columns
#' kc$summary(c("sw", "sw2"), c("height", "mass"), query = "homeworld:naboo")
#' }
#'
#' @param index_name a vector of index names.
#' @param columns a vector of column names.
#' @param query a string as a query string syntax (default: NULL).
#'
#' @return a list of tibble, splitted by indices.
#'
summary = function(index_name, columns, query = NULL){
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
if(all(self$version$major >= 7, self$version$minor >= 8)){
if(self$verbose) message(" -> ES >= 7.8.0, boxplot endpoint available")
# use boxplot endpoint
# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-boxplot-aggregation.html#search-aggregations-metrics-boxplot-aggregation-approximation
#
test_function <- function(e){
e %>%
(function(m) purrr::is_null(m$min)) %>%
unlist(use.names = FALSE) %>%
any()
}
#
private$multi_value_metric_aggregation("boxplot", index_name, columns,
function_test_null_result = test_function,
query = query)
} else {
# classical way
if(self$verbose) message(" -> ES < 7.8.0, boxplot endpoint not available")
args <- list(index_name, columns, query = query)
chain <- function(ldf, func){
fres <- suppressMessages({ do.call(func, args) })
purrr::imap(ldf, function(x,y){
dplyr::inner_join(x, fres[[y]], by = c("column"))
})
}
do.call(self$min, args) %>%
chain(self$max) %>%
chain(self$q1) %>%
chain(self$q2) %>%
chain(self$q3)
}
},
# --------------------------------------------------------------------------
# methods - Data manipulation
# --------------------------------------------------------------------------
#' @details
#' Get distinct keys elements of a specific column.
#'
#' @family data-manipulation
#'
#' @examples
#' \dontrun{
#' kc$keys("sw", "name")
#' kc$keys("sw", "eye_color")
#' }
#'
#' @param index_name an index name.
#' @param column a field name of this index (default: NULL).
#' @param max_size the maximum result to return (default: 1000).
#'
#' @return a vector of keys values from this field/column
#'
keys = function(index_name, column, max_size = 1000){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(length(index_name) > 1) stop(private$err_one_value("index_name"))
if(private$is_search_pattern(index_name)) stop(private$err_search_pattern_forbidden("index_name"))
if(!purrr::is_character(column)) stop(private$err_param_type_character("column"))
if(length(column) > 1) stop(private$err_one_value("column"))
if(!(column %in% self$columns(index_name)[[index_name]])) stop(private$err_field_unknown(index_name, column))
if(!is.numeric(max_size)) stop(private$err_param_type_numeric("max_size"))
if(max_size < 0) stop(private$err_param_positive("max_size"))
#
get_body <- function(field){
paste0('{',
'"size": 0,',
'"aggs":{',
'"kaggs":{',
'"composite":{',
'"size": ', max_size, ',',
'"sources":[{',
'"kaggs":{',
'"terms":{',
'"field": "', field, '"',
'}',
'}',
'}]',
'}',
'}',
'}',
'}')
}
# warn
if(self$verbose) message(" -> Maximum size asked: ", max_size)
# get
buckets <- tryCatch(
expr = {
if(self$verbose) message(" -> Looking for '", column, "' column keys... ", appendLF = FALSE)
r <- elastic::Search(
self$connection,
index = index_name,
size = 0,
body = get_body(column)
)
if(self$verbose) message("ok")
r
},
error = function(e){
if(self$verbose) message("nok")
if(e$message == "400 - all shards failed"){
tryCatch(
expr = {
new_col <- paste0(column, ".keyword")
if(self$verbose) message(" -> Looking for '", new_col, "' column keys... ", appendLF = FALSE)
r <- elastic::Search(
self$connection,
index = index_name,
size = 0,
body = get_body(new_col)
)
if(self$verbose) message("ok")
r
}, error = function(ee){
stop("Elasticsearch - ", ee)
}
)
} else {
stop("Elasticsearch - ", e)
}
}
)
# get columns
res <- buckets$aggregations$kaggs$buckets %>%
lapply(function(x){ x$key }) %>%
unlist(use.names = FALSE)
#
if(self$quiet_results) invisible(res) else res
},
# --------------------------------------------------------------------------
# methods - Data transformation
# --------------------------------------------------------------------------
#' @details
#' Transformation function for collapsing the BAM list of lists format
#' into a single list as per the Rsamtools vignette
#'
#' @family data-manipulation
#'
#' @examples
#' \dontrun{
#' dd_bai <- system.file("extdata", "test.bam.bai", package = "kibior")
#' bam_param <- Rsamtools::ScanBamParam(what = c("pos", "qwidth"))
#' bam_data <- Rsamtools::scanBam(dd_bai, param = bam_param)
#' kc$bam_to_tibble(bam_data)
#' }
#'
#' @param bam_data data from a BAM file (default: NULL).
#'
#' @return a tibble of BAM data
#'
bam_to_tibble = function(bam_data = NULL){
.unlist <- function(x){
# do.call(c, ...) coerces factor to integer, which is undesired
x1 <- x[[1L]]
if (is.factor(x1)){
structure(unlist(x), class = "factor", levels = levels(x1))
} else {
do.call(c, x)
}
}
# store names of BAM fields
bam_field <- names(bam_data[[1]])
# go through each BAM field and unlist and store as data frame
bam_df <- lapply(bam_field, function(y){
.unlist(lapply(bam_data, "[[", y))
}) %>%
do.call(data.frame, .)
names(bam_df) <- bam_field
tibble::as_tibble(bam_df, .name_repair = "unique")
},
#' @details
#' Casting function that tries to cast a transformation closure. Uses
#' tibble::as_tibble() by default.
#'
#' @family data-manipulation
#'
#' @examples
#' \dontrun{
#' kc$soft_cast(datasets::iris)
#' }
#'
#' @param data data to cast.
#' @param caster the caster closure/function (default: tibble::as_tibble)
#' @param caster_args others caster args (default: list(.name_repair = "unique"))
#' @param warn do print warning if error? (default: TRUE)
#'
#' @return a cast or the unchanged data.
#'
soft_cast = function(data, caster = getFromNamespace("as_tibble", "tibble"), caster_args = list(.name_repair = "unique"), warn = TRUE){
if(!("closure" %in% typeof(caster))){
stop("Transformation function need to be a closure/function")
}
#
tryCatch(
expr = {
do.call(caster, list(x = data, caster_args))
},
error = function(e){
if(warn) message("Cannot cast, no changes applied")
if(self$verbose) message(e, "\n")
data
}
)
},
#' @details
#' Get a local filepath or an URL data through a tempfile. If the file
#' exists locally, the filepath will be returned, if not, it will tries
#' to download the data and return the temp filepath.
#'
#' @family data-manipulation
#'
#' @examples
#' \dontrun{
#' kc$get_resource(system.file("R", "kibior.R", package = "kibior"))
#' kc$get_resource("https://ftp.ncbi.nlm.nih.gov/entrez/README")
#' }
#'
#' @param url_or_filepath a filepath or an URL.
#' @param fileext the file extension (default: NULL).
#'
#' @return a filepath.
#'
get_resource = function(url_or_filepath, fileext = NULL){
if(!purrr::is_null(fileext)){
if(!purrr::is_character(fileext)) stop(private$err_param_type_character("fileext"))
if(length(fileext) > 1) stop(private$err_one_value("fileext"))
}
if(file.exists(url_or_filepath)){
# file exists
url_or_filepath
} else {
# not local
if(self$verbose) message("Filepath does not exist locally, trying download...")
tryCatch(
expr = {
f <- tempfile(fileext = fileext)
download.file(url_or_filepath, f)
f
},
error = function(e){
message(e)
},
warning = function(w){
message(w)
}
)
}
},
# --------------------------------------------------------------------------
# methods - Move data
# --------------------------------------------------------------------------
#' @details
#' Export data to a file.
#' Needs 'rio' package from CRAN.
#' Some data formats are not installed by default.
#' Use `rio::install_formats()` to be able to parse them.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' f <- tempfile(fileext=".csv")
#' # export and overwrite last file with the same data from Elasticsearch
#' kc$export(data = "sw", filepath = f)
#' # export from in-memory data to a file
#' kc$export(data = dplyr::starwars, filepath = f, force = TRUE)
#' }
#'
#' @param data an index name or in-memory data to be extracted to a file.
#' @param filepath the filepath to use as export, must contain the file extention.
#' @param format the file format to use (default: "csv").
#' @param force overwrite the file? (default: FALSE).
#'
#' @return the filepath if correctly exported, else an error
#'
export = function(data, filepath, format = "csv", force = FALSE){
if(purrr::is_null(data)) stop(private$err_empty_data("data"))
if(purrr::is_null(filepath)) stop(private$err_null_forbidden("filepath"))
if(!purrr::is_logical(force)) stop(private$err_param_type_logical("force"))
if(is.na(force)) stop(private$err_logical_na("force"))
if(purrr::is_character(data)){
if(private$is_search_pattern(data)) stop(private$err_search_pattern_forbidden("data"))
has_indices <- self$has(data)
absent_indices <- names(has_indices[has_indices == FALSE])
if(length(absent_indices) != 0){
stop(private$err_index_unknown(absent_indices))
}
}
# check package
if(!Kibior$.is_installed("rio")[[1]]){
stop(err_pkg_required("export", "rio", "CRAN"))
}
#
if(!force && file.exists(filepath)) stop("File already exists. Use `force = TRUE` to overwrite")
#
# data can be a in-memory dataset, or a name of an index in ES
dataset <- if(purrr::is_character(data)) self$pull(index_name = data)[[data]] else data
# write to file
res <- rio::export(x = dataset, file = filepath, format = format)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Import method for tabular data.
#' Needs 'rio' package from CRAN.
#' Works mainly with CSV, TSV, TAB, TXT and ZIPped formats.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' f <- tempfile(fileext = ".csv")
#' rio::export(ggplot2::diamonds, f)
#' # import to in-memory variable
#' kc$import_tabular(filepath = f)
#' # import raw data
#' kc$import_tabular(filepath = f, to_tibble = FALSE)
#' }
#'
#' @param filepath the filepath to use as import, must contain the file extention.
#' @param to_tibble returns the result as tibble? If FALSE, the raw default rio::import()
#' format will be used (default: TRUE).
#' @param fileext the file extension (default: ".csv").
#'
#' @return data contained in the file as a tibble, or NULL.
#'
import_tabular = function(filepath, to_tibble = TRUE, fileext = ".csv"){
# check package
if(!Kibior$.is_installed("rio")[[1]]){
stop(err_pkg_required("import_tabular", "rio", "CRAN"))
}
# do import
filepath %>%
self$get_resource(fileext = fileext) %>%
rio::import() %>%
(function(x){
if(to_tibble) self$soft_cast(x) else x
})
},
#' @details
#' Import method for features data.
#' Needs 'rtracklayer' package from Bioconductor.
#' Works with BED, GTF, GFFx, and GZIPped formats.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # get sample files
#' f_gff <- system.file("extdata", "chr_y.gff3.gz", package = "kibior")
#' f_bed <- system.file("extdata", "cpg.bed", package = "kibior")
#' # import to in-memory variable
#' kc$import_features(filepath = f_bed)
#' kc$import_features(filepath = f_gff)
#' # import raw data
#' kc$import_features(filepath = f_bed, to_tibble = FALSE)
#' kc$import_features(filepath = f_gff, to_tibble = FALSE)
#' }
#'
#' @param filepath the filepath to use as import, must contain the file extention.
#' @param to_tibble returns the result as tibble? If FALSE, the raw default
#' rtracklayer::import() format will be used (default: TRUE).
#' @param fileext the file extension (default: ".gtf").
#'
#' @return data contained in the file as a tibble, or NULL.
#'
import_features = function(filepath, to_tibble = TRUE, fileext = ".gtf"){
# check package
if(!Kibior$.is_installed("rtracklayer")[[1]]){
stop(err_pkg_required("import_features", "rtracklayer", "Bioconductor"))
}
#
filepath %>%
self$get_resource(fileext = fileext) %>%
rtracklayer::import() %>%
(function(x){
if(to_tibble) self$soft_cast(x) else x
})
},
#' @details
#' Import method for alignments data.
#' Needs 'Rsamtools' packages from Bioconductor.
#' Works with BAM format.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # get sample file
#' f_bai <- system.file("extdata", "test.bam.bai", package = "kibior")
#' # import to in-memory variable
#' kc$import_alignments(filepath = f_bai)
#' # import raw data
#' kc$import_alignments(filepath = f_bai, to_tibble = FALSE)
#' }
#'
#' @param filepath the filepath to use as import, should contain the file extention.
#' @param to_tibble returns the result as tibble? If FALSE, the raw default
#' Rsamtools::scanBam() format will be used (default: TRUE).
#' @param fileext the file extension (default: ".bam").
#'
#' @return data contained in the file as a tibble, or NULL.
#'
import_alignments = function(filepath, to_tibble = TRUE, fileext = ".bam"){
# check package
if(!Kibior$.is_installed("Rsamtools")[[1]]){
stop(err_pkg_required("import_alignments", "Rsamtools", "Bioconductor"))
}
#
filepath %>%
self$get_resource(fileext = fileext) %>%
Rsamtools::scanBam() %>%
(function(x){
if(to_tibble) self$soft_cast(x, caster = self$bam_to_tibble) else x
})
},
#' @details
#' Import method for JSON format.
#' Needs 'jsonlite' packages from CRAN.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # get sample file
#' f_json <- system.file("extdata", "storms100.json", package = "kibior")
#' # import to in-memory variable
#' kc$import_json(f_json)
#' # import raw data
#' kc$import_json(f_json, to_tibble = FALSE)
#' }
#'
#' @param filepath the filepath to use as import, should contain the file extention.
#' @param to_tibble returns the result as tibble? If FALSE, the raw dataframe format
#' will be used (default: TRUE).
#' @param fileext the file extension (default: ".json").
#'
#' @return data contained in the file as a tibble, dataframe or NULL.
#'
import_json = function(filepath, to_tibble = TRUE, fileext = ".json"){
# check package
if(!Kibior$.is_installed("jsonlite")[[1]]){
stop(err_pkg_required("import_json", "jsonlite", "CRAN"))
}
#
filepath %>%
self$get_resource(fileext = fileext) %>%
jsonlite::fromJSON() %>%
(function(x){
if(to_tibble) self$soft_cast(x) else x
})
},
#' @details
#' Import method for sequences data.
#' Needs 'Biostrings' package from Bioconductor.
#' Works with FASTA formats.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # get sample file
#' f_dna <- system.file("extdata", "dna_human_y.fa.gz", package = "kibior")
#' f_rna <- system.file("extdata", "ncrna_mus_musculus.fa.gz", package = "kibior")
#' f_aa <- system.file("extdata", "pep_mus_spretus.fa.gz", package = "kibior")
#' # import to in-memory variable
#' kc$import_sequences(filepath = f_dna, fasta_type = "dna")
#' # import raw data
#' kc$import_sequences(filepath = f_rna, to_tibble = FALSE, fasta_type = "rna")
#' # import auto
#' kc$import_sequences(filepath = f_aa)
#' }
#'
#' @param filepath the filepath to use as import, should contain the file extention.
#' @param to_tibble returns the result as tibble? If FALSE, the raw default
#' Rsamtools::scanBam() format will be used (default: TRUE).
#' @param fasta_type type of parsing. It can be "dna", "rna", "aa" ou "auto" (default:
#' "auto")
#'
#' @return data contained in the file as a tibble, or NULL.
#'
import_sequences = function(filepath, to_tibble = TRUE, fasta_type = "auto"){
if(!(fasta_type %in% c("auto", "dna", "rna", "aa"))){
stop("Needed fasta_type to be one of 'dna', 'rna','aa' ou 'auto'")
}
# check package
if(!Kibior$.is_installed("Biostrings")[[1]]){
stop(err_pkg_required("import_sequence", "Biostrings", "Bioconductor"))
}
# add parameter (, with.qualities=FALSE) for readXStringSet function
filepath %>%
# choose type
(function(x){
switch(fasta_type,
"dna" = { Biostrings::readDNAStringSet(x) },
"rna" = { Biostrings::readRNAStringSet(x) },
"aa" = { Biostrings::readAAStringSet(x) },
"auto" = { Biostrings::readBStringSet(x) },
stop("Unknown fasta option '", fasta_type, "'.")
)
}) %>%
# cast if needed
(function(stringset){
if(to_tibble){
data.frame(
width = Biostrings::width(stringset),
seq = as.character(stringset),
names = names(stringset)
) %>%
self$soft_cast()
} else {
stringset
}
})
},
#' @details
#' Import method that will try to guess importation method.
#' Will also try to read from compressed data if they are.
#' This method will call other import_* methods when trying.
#' Some data formats are not installed by default.
#' Use `rio::install_formats()` to be able to parse them.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # get sample file
#' f_dna <- system.file("extdata", "dna_human_y.fa.gz", package = "kibior")
#' f_rna <- system.file("extdata", "ncrna_mus_musculus.fa.gz", package = "kibior")
#' f_aa <- system.file("extdata", "pep_mus_spretus.fa.gz", package = "kibior")
#' f_bai <- system.file("extdata", "test.bam.bai", package = "kibior")
#' f_gff <- system.file("extdata", "chr_y.gff3.gz", package = "kibior")
#' f_bed <- system.file("extdata", "cpg.bed", package = "kibior")
#' # import
#' kc$guess_import(f_dna)
#' kc$guess_import(f_rna)
#' kc$guess_import(f_aa)
#' kc$guess_import(f_bai)
#' kc$guess_import(f_gff)
#' kc$guess_import(f_bed)
#' }
#'
#' @param filepath the filepath to use as import, must contain the file extention.
#' @param to_tibble returns the result as tibble? (default: TRUE).
#'
#' @return data contained in the file, or NULL.
#'
guess_import = function(filepath, to_tibble = TRUE){
#
guess_import_method <- function(f, rm_compression_extension = FALSE){
"f can be an url or a fs path."
"rm_compression_extension = TRUE will try to remove the first layer"
"of extension (e.g. whatever.csv.gz -> whatever.csv) before trying"
"to open the original file to identify the non-compressed file extension."
# try to resolve relative path
f <- tryCatch(
expr = { tools::file_path_as_absolute(f) },
error = function(e){ f }
)
f %>%
(function(x){
if(rm_compression_extension) tools::file_path_sans_ext(x) else x
}) %>%
tools::file_ext() %>%
(function(x){
if(!(x %in% names(private$.AUTO_IMPORT_EXTENSION_MAPPING))) stop("Unknown extension: '", x, "'.")
x
}) %>%
private$.AUTO_IMPORT_EXTENSION_MAPPING[[.]] %>%
(function(x){
if(self$verbose){
message("Try loading [", x, "] data...")
message(" file: ", f)
}
x
}) %>%
(function(type){
args <- list(
filepath = f,
to_tibble = to_tibble
)
m <- switch(type,
"tabular" = { self$import_tabular },
"features" = { self$import_features },
"alignments"= { self$import_alignments },
"sequences" = { self$import_sequences },
"json" = { self$import_json },
stop(paste0(private$ERR_WTF, " Uknown type: ", type))
)
do.call(what = m, args = args)
})
}
# try calling the right method
tryCatch(
expr = {
guess_import_method(filepath)
},
error = function(e){
# try removing potential compression extension else raise an error
tryCatch(
expr = {
guess_import_method(filepath, rm_compression_extension = TRUE)
},
error = function(ee){
if(self$verbose) message(ee$message)
stop("Cannot auto-import. Try with specific methods.")
}
)
}
)
},
#' @details
#' Generic import method.
#' This method will call other import_* methods when trying.
#' Some data formats are not installed by default.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # get sample file
#' f_aa <- system.file("extdata", "pep_mus_spretus.fa.gz", package = "kibior")
#' f_gff <- system.file("extdata", "chr_y.gff3.gz", package = "kibior")
#' f_bai <- system.file("extdata", "test.bam.bai", package = "kibior")
#' # import
#' kc$import(filepath = f_aa)
#' # import to Elasticsearch index ("sw_from_file") if not exists
#' kc$import(filepath = f_bai, push_index = "sw_from_file")
#' # import to index by recreating it, then pull indexed data
#' kc$import(filepath = f_gff, push_index = "sw_from_file",
#' push_mode = "recreate")
#' }
#'
#' @param filepath the filepath to use as import, must contain the file extention.
#' @param import_type can be one of "auto", "tabular", "features", "alignments", "sequences"
#' (default: "auto").
#' @param push_index the name of the index where to push data (default: NULL).
#' @param push_mode the push mode (default: "check").
#' @param id_col the column name of unique IDs (default: NULL).
#' @param to_tibble returns the result as tibble? (default: TRUE).
#'
#' @return data contained in the file, or NULL.
#'
import = function(filepath, import_type = "auto", push_index = NULL, push_mode = "check", id_col = NULL, to_tibble = TRUE){
if(!purrr::is_character(filepath)) stop(private$err_param_type_character("filepath"))
if(length(filepath) > 1) stop(private$err_one_value("filepath"))
if(!purrr::is_null(push_index) && !purrr::is_character(push_index)) stop(private$err_param_type_character("push_index", can_be_null = TRUE))
if(!(push_mode %in% private$.VALID_PUSH_MODES)) stop(private$err_not_in_vector("push_mode", private$.VALID_PUSH_MODES))
if(!purrr::is_null(id_col)){
if(!purrr::is_character(id_col)) stop(private$err_param_type_character("id_col"))
if(length(id_col) > 1) stop(private$err_one_value("id_col"))
}
if(!purrr::is_logical(to_tibble)) stop(private$err_param_type_logical("to_tibble"))
if(is.na(to_tibble)) stop(private$err_logical_na("to_tibble"))
# perform import function
m <- switch(import_type,
"auto" = { self$guess_import },
"tabular" = { self$import_tabular },
"features" = { self$import_features },
"alignments"= { self$import_alignments },
"sequences" = { self$import_sequences },
"json" = { self$import_json },
stop(private$ERR_WTF, " Unknown import type '", import_type, "'")
)
data <- m(filepath = filepath, to_tibble = to_tibble)
# check/push
no_data <- { dim(data) == c(0, 0) } %>% all()
res <- NULL
#
if(no_data) {
if(self$verbose) message("No data found in '", filepath, "'")
} else {
res <- data %>% tibble::as_tibble()
if(!purrr::is_null(push_index)){
res %>%
self$push(push_index, mode = push_mode, id_col = id_col)
}
}
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Push data from in-memory to Elasticsearch.
#' Everything is done by bulk.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # erase the last push data by recreating the index and re-pushing data
#' kc$push(dplyr::starwars, index_name = "sw", mode = "recreate")
#' # characters names are unique, can be used as ID
#' kc$push(dplyr::starwars, index_name = "sw", mode = "recreate", id_col = "name")
#' # a bit more complicated: update some data of the dataset "starwars"
#' # 38 records on 87 filtered
#' some_new_data <- dplyr::filter(dplyr::starwars, height > 180)
#' # make them all "gender <- female"
#' some_new_data["gender"] <- "female"
#' # update that apply, based on cahracter names to match the right record
#' kc$push(some_new_data, "sw", mode = "update", id_col = "name")
#' # view result by querying
#' kc$pull("sw", query = "height:>180", columns = c("name", "gender"))
#' }
#'
#' @param data the data to push.
#' @param index_name the index name to use in Elasticsearch.
#' @param bulk_size the number of record to send to Elasticsearch in a row (default: 1000).
#' @param mode the push mode, could be "check", "recreate" or "update" (default: "check").
#' @param id_col an column anme to use as ID, must be composed of unique elements (default:
#' NULL).
#'
#' @return the index_name given if the push ended well, else an error.
#'
push = function(data, index_name, bulk_size = 1000, mode = "check", id_col = NULL){
if(missing(data) || purrr::is_null(data)) stop(private$err_null_forbidden("data"))
if(nrow(data) == 0) stop(private$err_null_forbidden("data"))
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(length(index_name) > 1) stop(private$err_one_value("index_name"))
if(!is.numeric(bulk_size)) stop(private$err_param_type_numeric("bulk_size"))
if(length(bulk_size) > 1) stop(private$err_one_value("bulk_size"))
if(bulk_size < 1) stop(private$err_param_positive("bulk_size",))
if(!(mode %in% private$.VALID_PUSH_MODES)) stop(private$err_not_in_vector("mode", private$.VALID_PUSH_MODES))
if(length(mode) > 1) stop(private$err_one_value("mode"))
# check index absence/presence
has_index <- self$match(index_name)
if(mode == "check"){
if(index_name %in% has_index) stop(private$err_index_already_exists(index_name))
}
if(mode == "update"){
if(purrr::is_null(id_col)) stop("Update mode needs a unique IDs column name.")
if(!(index_name %in% has_index)) stop(private$err_index_unknown(absent_indices))
}
# -----------------------------------------
# PREPARE DATA
# ------------------
# data names to lowercase
names(data) <- tolower(names(data))
if(self$verbose) message(" -> Forcing all columns names to lowercase")
# ------------------
# handle id_col
ids <- NULL
if(!purrr::is_null(id_col)){
if(!purrr::is_character(id_col)) stop(private$err_param_type_character("id_col"))
if(length(id_col) > 1) stop(private$err_one_value("id_col"))
id_col <- tolower(id_col)
if(!(id_col %in% names(data))) stop("IDs column name not found.")
# extracting ids from data
ids <- dplyr::pull(data, id_col)
if(length(unique(ids)) != length(ids)) stop("Column used as ID does not have unique elements.")
} else {
private$err_msg <- paste0("Kibior tried to add a unique column IDs '",
self$default_id_col, "', but the column name already exists. ",
"Try to define a column with unique IDs, change the column name or define another default name with '$default_id_col'.")
if(self$default_id_col %in% names(data)) stop(private$err_msg)
# force add a column with unique id (single sequence 1:nrow)
ids <- seq_len(nrow(data))
data <- within(data, assign(self$default_id_col, ids))
if(self$verbose) message(" -> Adding unique '", self$default_id_col, "' column to enforce uniqueness")
}
# ------------------
# try to flatten if data.frame class is found in one of the column types
has_df_type <- data %>%
lapply(class) %>%
unlist(use.names = FALSE) %>%
{"data.frame" %in% .} %>%
any()
if(has_df_type){
if(self$verbose) message(" -> Flattening columns having sub-dataframes")
data <- jsonlite::flatten(data)
}
# ------------------
# transform col: field names cannot contain dots, transform and warn user
has_dot <- grepl(".", names(data), fixed = TRUE)
if(any(has_dot)){
# replace names
old_names <- names(data)[has_dot == TRUE]
new_names <- old_names %>% gsub("\\.", "_", .)
changed_names <- gsub("\\.", "_", names(data))
# check
if(length(changed_names) != length(unique(changed_names))){
zip <- changed_names
names(zip) <- names(data)
zip <- as.list(zip)
for(i in names(zip)){
if(i != zip[[i]] && zip[[i]] %in% names(zip)){
stop("Column name [", i, "] should be changed to [", zip[[i]], "], but it already exists")
}
}
}
# apply
names(data) <- changed_names
# warn
if(self$verbose){
message(" -> Changing column names: ")
old_names %>%
paste0(collapse = ", ") %>%
message(" from: ", .)
new_names %>%
paste0(collapse = ", ") %>%
message(" to: ", .)
}
}
# ------------------
# remove empty columns
is_column_empty_char <- function(x){
# test if a list contains only empty strings "" values
x %>%
unlist(use.names = FALSE) %>%
{. == ""} %>%
all()
}
is_column_empty_null <- function(x){
# test if a list contains only NULL values
x %>%
unlist(use.names = FALSE) %>%
purrr::is_null()
}
is_column_empty_na <- function(x){
# test if a list contains only NA values
x %>%
unlist(use.names = FALSE) %>%
is.na() %>%
all()
}
is_column_empty <- function(x){
# test if a list contains only NA, NULL or empty strings values
is_column_empty_char(x) ||
is_column_empty_null(x) ||
is_column_empty_na(x)
}
# get empty columns
cols_empty <- sapply(data, is_column_empty)
if(any(cols_empty)){
if(self$verbose){
cols_empty %>%
which(.) %>%
names() %>%
paste0(collapse = ", ") %>%
message(" -> Removing empty columns: ", .)
}
# remove empty columns from data
data <- data[-which(cols_empty)]
}
# ------------------
# Listed columns containing NA/NULL changed to "" (empty string)
list_col_names <- data %>%
sapply(purrr::is_list) %>%
data[.] %>%
names
if(length(list_col_names) > 0){
if(self$verbose){
list_col_names %>%
paste0(collapse = ", ") %>%
message(" -> Adapting NULL/NA to empty strings values from columns: ", .)
}
data <- data %>%
dplyr::mutate_at(dplyr::all_of(dplyr::vars(list_col_names)), tidyr::replace_na, "") %>%
tibble::as_tibble()
}
# -----------------------------------------
# DEFINE AND APPLY MAPPING
# process: create index and mapping
if(mode != "update"){
res <- self$create(index_name = index_name, force = (mode == "recreate"))
if(!res[[index_name]]) stop("Index '", index_name,"' not created")
# define mapping based on data type
if(self$verbose) message(" -> Defining mapping of '", index_name, "'")
mapping <- private$define_mappings(data)
#
err_msg <- paste0("Cannot apply mapping to '", index_name, "'")
mapping_res <- tryCatch(
expr = {
suppressWarnings({
private$create_mappings(index_name, mapping)
})
},
error = function(e){
if(self$verbose){
err_msg <- paste0(err_msg, "\n ", e$message)
}
stop(err_msg)
}
)
# control result
if(purrr::is_null(mapping_res) || !mapping_res$acknowledged) stop(err_msg)
}
# -----------------------------------------
# SEND DATA
# prepare
if(self$verbose) message(" -> Sending data to '", index_name, "'")
args <- list(conn = self$connection,
x = data,
index = index_name,
chunk_size = bulk_size,
raw = FALSE,
doc_ids = ids,
quiet = self$quiet_progress,
query = list(refresh = "wait_for"))
# before v7, need arg type
if(self$version$major < 7){
args[["type"]] = "_doc"
}
# choose bulk method
bulk_method <- NULL
if(mode != "update"){
# if id_col not defined by the user, Elasticsearch will give ids automatically
args[["es_ids"]] = FALSE
bulk_method <- elastic::docs_bulk_index
} else {
bulk_method <- elastic::docs_bulk_update
}
# Time the bulk send
clock_start <- proc.time()
bulks_res <- suppressWarnings({
# suppress warnings caused by type removal deprecation in ES 6 and 7
rr <- do.call(bulk_method, args)
rr[[1]]
})
elapsed <- proc.time() - clock_start
# some errors during data transfer
if(bulks_res[["errors"]]){
msg_errors <- bulks_res$items %>%
lapply(function(x){ x[["index"]][["error"]][["reason"]] }) %>%
unlist(use.names = FALSE) %>%
paste0(collapse = ", ") %>%
warning("Some errors happenned during data transfer: ", .)
}
# verbose
if(self$verbose){
es_took <- bulks_res[["took"]] %>%
unlist(use.names = FALSE) %>%
sum() %>%
as.double() %>%
{. / 1000} %>%
private$humanize_time()
# user info
user_took <- elapsed[["elapsed"]] %>%
private$humanize_time()
# execution
message(" -> Execution time: ")
message(" - data sent: ", es_took$time, es_took$unit)
message(" - user wait: ", user_took$time, user_took$unit)
}
# get some time so Elasticsearch can make them available
if(self$verbose) message(" -> Waiting a bit for Elasticsearch")
Sys.sleep(self$elastic_wait)
if(self$quiet_results) invisible(index_name) else index_name
},
#' @details
#' Pull data from Elasticsearch.
#' Everything is done by bulk.
#' This method is essentially a wrapper around `$search()` with parameter `head = FALSE`
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # push some data sample
#' kc$push(dplyr::storms, "storms")
#' # get the whole "sw" index
#' kc$pull("sw")
#' # get the whole "sw" index with all metadata
#' kc$pull("sw", keep_metadata = TRUE)
#' # get only "name" and "status" columns of indices starting with "s"
#' # columns not found will be ignored
#' kc$pull("s*", columns = c("name", "status"))
#' # limit the size of the result to 10
#' kc$pull("storms", max_size = 10, bulk_size = 10)
#' # use Elasticsearch query syntax to select and filter on all indices, for all data
#' # Here, we want to search for all records taht match the conditions:
#' # field "height" is strictly more than 180 AND field homeworld is "Tatooine" OR "Naboo"
#' r <- kc$pull("sw", query = "height:>180 && homeworld:(Tatooine || Naboo)")
#' # it can be used in conjunction with `columns` to select only columns that matter
#' r <- kc$pull("sw", query = "height:>180 && homeworld:(Tatooine || Naboo)", columns =
#' c("name", "hair_color", "homeworld"))
#' }
#'
#' @param index_name the index name to use in Elasticsearch.
#' @param bulk_size the number of record to send to Elasticsearch in a row (default: 500).
#' @param max_size the number of record Elasticsearch will send (default: NULL (all data)).
#' @param scroll_timer the time the scroll API will let the request alive to scroll on the
#' result (default: "3m" (3 minute)).
#' @param keep_metadata does Elasticsearch needs to sent metadata? Data columns will be
#' prefixed by "_source." (default: FALSE).
#' @param columns a vector of columns to select (default: NULL (all columns)).
#' @param query a string formatted to Elasticsearch query syntax, see links for the syntax
#' details (default: NULL)
#'
#' # Simple syntax details:
#'
#' @seealso \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units}
#' for time-units and \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax}
#' for the Elasticsearch query string syntax.
#'
#' @return a list of datasets corresponding to the pull request, else an error. Keys of the
#' list are index names matching the request, value are the associated tibbles
#'
pull = function(index_name, bulk_size = 500, max_size = NULL, scroll_timer = "3m", keep_metadata = FALSE, columns = NULL, query = NULL) {
args <- list(
index_name = index_name,
keep_metadata = keep_metadata,
columns = columns,
query = query,
bulk_size = bulk_size,
max_size = max_size,
scroll_timer = scroll_timer,
head = FALSE
)
if(self$quiet_results) invisible(do.call(self$search, args)) else do.call(self$search, args)
},
#'
#' @details
#' Move data from one index to another.
#' It needs to be configured in the `config/elasticsearch.yml` file to actually work.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' kc$push(dplyr::starwars, "sw", mode = "recreate")
#' # move data from an index to another (change name, same instance)
#' r <- kc$move(from_index = "sw", to_index = "sw_new")
#' kc$pull("sw_new")
#' kc$list()
#' }
#'
#' @param from_instance If not NULL, the Kibior object of another instance. if NULL
#' (default), this instance will be used. (default: NULL).
#' @param from_index The source index name (default: NULL).
#' @param to_index The destination index name (default: NULL).
#' @param force Does the destination index need to be erase? (default: FALSE)
#' @param copy Does the destination have to be a copy of the source? FALSE (default) will
#' delete source index, TRUE will keep it. (default: FALSE).
#'
#' @seealso: \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html}
#' Elasticsearch reindex feature for more information.
#'
#' @return the reindex result
#'
move = function(from_index, to_index, from_instance = NULL, force = FALSE, copy = FALSE){
if(!purrr::is_null(from_instance) && !Kibior$is_instance(from_instance)) stop("Need a Kibior instance type or NULL.")
is_local <- purrr::is_null(from_instance)
# select source instance
source_instance <- if(is_local) self else from_instance
if(!purrr::is_character(from_index)) stop(private$err_param_type_character("from_index"))
if(private$is_search_pattern(from_index)) stop(private$err_search_pattern_forbidden("from_index"))
if(length(from_index) > 1) stop(private$err_one_value("from_index"))
if(!purrr::is_character(to_index)) stop(private$err_param_type_character("to_index"))
if(private$is_search_pattern(to_index)) stop(private$err_search_pattern_forbidden("to_index"))
if(length(to_index) > 1) stop(private$err_one_value("to_index"))
if(!purrr::is_logical(force)) stop(private$err_param_type_logical("force"))
if(is.na(force)) stop(private$err_logical_na("force"))
if(is_local && from_index == to_index) stop("Source and destination indices are the same.")
has_indices <- source_instance$has(from_index)
absent_indices <- names(has_indices[has_indices == FALSE])
if(length(absent_indices) != 0) stop(private$err_index_unknown(absent_indices))
if(!force && !purrr::is_null(self$match(to_index))){
stop(private$err_index_already_exists(already_there_indices))
}
if(!purrr::is_logical(copy)) stop(private$err_param_type_logical("copy"))
if(is.na(copy)) stop(private$err_logical_na("copy"))
# msg
if(self$verbose){
action <- if(copy) "Copying" else "Moving"
source_str <- paste0("'" , source_instance$host, ":", source_instance$port, "/", from_index, "'")
dest_str <- paste0("'", self$host, ":", self$port, "/", to_index, "'")
message(" -> ", action, " " , source_str, " to ", dest_str)
}
# TODO: add "_source" = columns to select only some columns
# TODO: add "query" = query to execute a query before moving, need to parse query string to struct
# building args - source subpart
source_param <- list(index = from_index)
if(!is_local){
source_param[["remote"]] <- list(host = from_instance$endpoint)
if(!purrr::is_null(from_instance$username)){
source_param[["remote"]][["username"]] <- from_instance$username
}
if(!purrr::is_null(from_instance$password)){
source_param[["remote"]][["password"]] <- from_instance$password
}
}
# compose args - source + dest
reindex_args <- list(
conn = self$connection,
body = list(
source = source_param,
dest = list(index = to_index)
),
wait_for_completion = "false"
)
# function to manage ES errors
handle_error <- function(e){
message(" -> Moving: errors during data transfer")
self$delete(to_index)
stop(e)
}
# reindex
self$create(to_index, force = force)
res <- tryCatch(
expr = {
do.call(elastic::reindex, reindex_args)
},
error = function(e){
handle_error(e)
}
)
# if there are some errors during reindex
if(length(res$failure) != 0) handle_error(res$failure)
# removing source index if action is "moving"
if(!copy) source_instance$delete(from_index)
# msg
if(self$verbose){
t <- {res$took / 1000} %>% private$humanize_time()
message(" -> Documents transfered: ", res$total, ", took: ", t$time, t$unit)
}
#
if(self$verbose) message(" -> Waiting a bit for Elasticsearch")
Sys.sleep(self$elastic_wait)
if(self$quiet_results) invisible(to_index) else to_index
},
#'
#' @details
#' Copy data from one index to another.
#' It needs to be configured in the `config/elasticsearch.yml` file to actually work.
#' This method is a wrapper around `$move(copy = TRUE)`.
#'
#' @family move-data
#'
#' @examples
#' \dontrun{
#' # copy data from one index to another (same instance)
#' r <- kc$copy(from_index = "sw_new", to_index = "sw")
#' kc$pull(c("sw", "sw_new"))
#' kc$list()
#' }
#'
#' @param from_instance If not NULL, the Kibior object of another instance. if NULL
#' (default), this instance will be used. (default: NULL).
#' @param from_index The source index name (default: NULL).
#' @param to_index The destination index name (default: NULL).
#' @param force Does the destination index need to be erase? (default: FALSE)
#'
#' @seealso: \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html}
#' Elasticsearch reindex feature for more information.
#'
#' @return the reindex result
#'
copy = function(from_index, to_index, from_instance = NULL, force = FALSE){
args <- list(
from_instance = from_instance,
from_index = from_index,
to_index = to_index,
force = force,
copy = TRUE
)
if(self$quiet_results) invisible(do.call(self$move, args)) else do.call(self$move, args)
},
# --------------------------------------------------------------------------
# methods - Search
# --------------------------------------------------------------------------
#' @details
#' Match requested index names against Elasticsearch indices list.
#'
#' @family search-data
#'
#' @examples
#' \dontrun{
#' # search "sw" index name
#' kc$match("sw")
#' # search all starting with an "s"
#' kc$match("s*")
#' # get all index name, identical to `$list()`
#' kc$match("*")
#' # search multiple names
#' kc$match(c("sw", "sw_new", "nope"))
#' # search multiple names with pattern
#' kc$match(c("s*", "nope"))
#' }
#'
#' @param index_name the index name to use in Elasticsearch, can be a pattern with '*'.
#'
#' @return a vector of matching index names, NULL if nothing matches.
#'
match = function(index_name){
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
# first requests can be long, warn user
if(self$verbose){
index_name %>%
private$vector_to_str() %>%
message(" -> Requesting index names: ", .)
}
# get indices
res <- elastic::search_shards(
conn = self$connection,
index = index_name
)[["indices"]] %>% names()
#
if(typeof(res) != "character") stop(private$ERR_WTF, " expected string")
if(length(res) == 0) res <- NULL
#
if(self$verbose){
to_msg <- if(purrr::is_null(res)) "none" else private$vector_to_str(res)
message(" -> Matching index names: ", to_msg)
}
#
res
},
#' @details
#' Search data from Elasticsearch.
#' The goal of this method is to discover quickly what data are interesting, thus
#' `head = TRUE` by default.
#' If you want to get all data, use `head = FALSE` or `$pull()`.
#' Everything is done by bulk.
#'
#' @family search-data
#'
#' @examples
#' \dontrun{
#' # search "sw" index, head mode on
#' kc$search("sw")
#' # search "sw" index with all metadata, head mode on
#' kc$search("sw", keep_metadata = TRUE)
#' # get only "name" field of the head of indices starting with "s"
#' # if an index does not have the "name" field, it will be empty
#' kc$search("s*", columns = "name")
#' # limit the size of the result to 50 to the whole index
#' kc$search("storms", max_size = 50, bulk_size = 50, head = FALSE)
#' # use Elasticsearch query syntax to select and filter on all indices, for all data
#' # Here, we want to search for all records taht match the conditions:
#' # field "height" is strictly more than 180 AND field homeworld is "Tatooine" OR "Naboo"
#' kc$search("*", query = "height:>180 && homeworld:(Tatooine || Naboo)")
#' # it can be used in conjunction with `columns` to select only columns that matter
#' kc$search("*", query = "height:>180 && homeworld:(Tatooine || Naboo)", columns =
#' c("name", "hair_color", "homeworld"))
#' }
#'
#' @param index_name the index name to use in Elasticsearch (default: NULL).
#' @param bulk_size the number of record to send to Elasticsearch in a row (default: 500).
#' @param max_size the number of record Elasticsearch will send (default: NULL (all data)).
#' @param scroll_timer the time the scroll API will let the request alive to scroll on the
#' result (default: "3m" (3 minutes)).
#' @param keep_metadata does Elasticsearch needs to sent metadata? Data columns will be
#' prefixed by "_source." (default: FALSE).
#' @param columns a vector of columns to select (default: NULL (all columns)).
#' @param head a boolean limiting the search result and time (default: TRUE)
#' @param query a string formatted to Elasticsearch query syntax, see links for the syntax
#' details (default: NULL)
#'
#' @seealso \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units}
#' for time-units and \url{https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax}
#' for the Elasticsearch query string syntax.
#'
#' @return a list of datasets corresponding to the pull request, else an error. Keys of the
#' list are index names matching the request, value are the associated tibbles
#'
search = function(index_name = "_all", keep_metadata = FALSE, columns = NULL, bulk_size = 500, max_size = NULL, scroll_timer = "3m", head = TRUE, query = NULL){
if(purrr::is_null(index_name)) index_name <- "_all"
if(!purrr::is_character(index_name)) stop(private$err_param_type_character("index_name"))
if(!purrr::is_logical(head)) stop(private$err_param_type_logical("head"))
if(is.na(head)) stop(private$err_logical_na("head"))
if(("_all" %in% index_name) && !head) message("Retriving ALL data... you should target some indices instead.")
# define when to stop the search with head mode
if(self$verbose) {
message(" -> Using head mode: ", if(head) "yes" else "no")
if(head) message(" -> Head size: ", self$head_search_size)
}
if(!is.numeric(bulk_size)) stop(private$err_param_type_numeric("bulk_size"))
if(length(bulk_size) > 1) stop(private$err_one_value("bulk_size"))
if(bulk_size < 1) stop(private$err_param_positive("bulk_size", can_be_null = FALSE))
bulk_size <- as.integer(bulk_size)
if(bulk_size > private$.MAX_BUCKET_SIZE){
bulk_size <- private$.MAX_BUCKET_SIZE
if(self$verbose) message(" -> Restricting 'bulk_size' to 10.000")
}
if(self$verbose) message(" -> Using 'bulk_size': ", bulk_size)
if(!purrr::is_null(max_size)){
if(!is.numeric(max_size)) stop(private$err_param_type_numeric("max_size"))
if(length(max_size) > 1) stop(private$err_one_value("max_size"))
if(max_size < 1) stop(private$err_param_positive("max_size", can_be_null = FALSE))
max_size <- as.integer(max_size)
if(self$verbose) message(" -> Using 'max_size': ", max_size)
if(bulk_size > max_size){
if(self$verbose) message(" -> Reducing 'bulk_size' to match 'max_size': ", max_size)
bulk_size <- max_size
}
}
if(!purrr::is_character(scroll_timer)) stop(private$err_param_type_character("scroll_timer"))
if(length(scroll_timer) > 1) stop(private$err_one_value("scroll_timer"))
if(!purrr::is_logical(keep_metadata)) stop(private$err_param_type_logical("keep_metadata"))
if(is.na(keep_metadata)) stop(private$err_logical_na("keep_metadata"))
if(!purrr::is_null(columns) && !purrr::is_character(columns)) stop(private$err_param_type_character("columns"))
if(!purrr::is_null(query)){
if(!purrr::is_character(query)) stop(private$err_param_type_character("query"))
if(length(query) > 1) stop(private$err_one_value("query"))
}
# return result
final_df <- NULL
# init
selected_fields <- if(purrr::is_null(columns)) NULL else paste0(columns, collapse = ",")
end_search <- FALSE
# total according to ES version
get_total_records <- function(hits_total){
if(self$version$major > 6) hits_total$value else hits_total
}
# function: get clean bulk
get_clean_bulk <- function(bulk){
"Clean bulks when they arrive"
# stable
workable_hits <- list()
for(i in seq(nrow(bulk))) {
# get each line (df) as list
t <- bulk[i,] %>% as.list()
# extract _source if needed, else flatten the results
# Must be only one level, error else
# TODO: handle x levels: recusive = T -> prb with rbindlist
if(keep_metadata){
workable_hits[[i]] <- unlist(t, recursive = FALSE)
} else {
workable_hits[[i]] <- t[["_source"]]
}
}
# combine to raw_hits
workable_hits %>%
data.table::rbindlist(use.names = TRUE, fill = FALSE) %>%
tibble::as_tibble(.name_repair = "unique")
}
# function: change column type
change_column_type <- function(raw){
"Take a tibble with listed columns and tries to
take the inner type of the listed type
"
data <- raw
for(i in names(raw)){
n <- data[[i]][[1]]
if(length(n) == 1){
f <- switch(typeof(n),
"character" = { as.character },
"integer" = { as.integer },
"double" = { as.double },
"logical" = { as.logical },
stop(private$ERR_WTF, "Unknown type '", n, "'.")
)
data[[i]] <- f(data[[i]])
}
}
data
}
# extract ids
extract_ids <- function(hits){
hits %>%
lapply(function(h){
h[["_id"]]
}) %>%
unlist(use.names = FALSE)
}
# involved_indices
involved_indices <- self$match(index_name)
if(!purrr::is_null(involved_indices)){
final_df <- involved_indices %>%
lapply(function(x){ list() }) %>%
setNames(involved_indices)
# various infos per index
run_infos <- final_df
for(i in names(run_infos)){
run_infos[[i]][["end_reached"]] <- FALSE
run_infos[[i]][["total_hits"]] <- NA
run_infos[[i]][["threshold"]] <- NA
}
# per index, first search config
if(self$verbose) message(" -> Getting search results:")
for(current_index in names(run_infos)){
# first search, init scroll and manage error
search_res <- tryCatch({
elastic::Search(
self$connection,
index = current_index,
size = bulk_size,
source = "__", # nullify _source
time_scroll = scroll_timer,
q = query
)
},
error = function(e){
# pattern when limit of request size is reached
r <- e$message %>%
trimws() %>%
stringr::str_match("400 - An HTTP line is larger than ([0-9]+) bytes.") %>%
.[1,2]
if(!is.na(r)){
msg <- paste0("Server '", self$cluster_name, "' (", self$host, ")")
msg <- paste0(msg, " cannot take requests that big (max ", r," bytes).")
msg <- paste0(msg, " Try cutting down your query into several pieces.")
} else {
msg <- e$message
}
stop(msg)
}
)
# check total hits
run_infos[[current_index]][["total_hits"]] <- get_total_records(search_res$hits$total)
# no results
if(length(search_res$hits$hits) == 0){
run_infos[[current_index]][["end_reached"]] <- FALSE
}
# max threshold
tmp_threshold <- if(purrr::is_null(max_size)) run_infos[[current_index]][["total_hits"]] else max_size
if(head && tmp_threshold > self$head_search_size){
tmp_threshold <- self$head_search_size
}
run_infos[[current_index]][["threshold"]] <- tmp_threshold
# scroll id
run_infos[[current_index]][["scroll_id"]] <- search_res[["_scroll_id"]]
# timer
run_infos[[current_index]][["timer"]] <- 0
# last hits
run_infos[[current_index]][["hits"]] <- extract_ids(search_res$hits$hits)
# msg
if(self$verbose){
index_hits_asked <- run_infos[[current_index]][["threshold"]]
index_hits_total <- run_infos[[current_index]][["total_hits"]]
paste0(" - ", current_index, ": ") %>%
paste0(if(index_hits_asked > index_hits_total) index_hits_total else index_hits_asked) %>%
paste0("/", index_hits_total) %>%
message()
}
}
# base args for inc loops
base_args = list(
conn = self$connection,
source = TRUE,
raw = TRUE,
verbose = FALSE,
callopts=list(verbose=FALSE)
)
# progress bar init
pb <- NULL
if(!head && !self$quiet_progress){
cumul_threshold <- run_infos %>%
lapply(function(x){
if(x[["threshold"]] < x[["total_hits"]]) x[["threshold"]] else x[["total_hits"]]
}) %>%
unlist(use.names = FALSE) %>%
sum()
if(cumul_threshold > 0){
# init pbar
pb_sum <- 0
pb <- txtProgressBar(min = 0, max = cumul_threshold, initial = 0, style = 3)
}
}
# start timers
for(current_index in names(final_df)){
run_infos[[current_index]][["timer"]] <- proc.time()
}
# loop until nothing is returned or threshold is reached
all_end_search <- FALSE
while(!all_end_search){
# each index
for(current_index in names(final_df)){
# first search check
if(length(run_infos[[current_index]][["hits"]]) == 0) {
run_infos[[current_index]][["end_reached"]] <- TRUE
# Stop the clock
run_infos[[current_index]][["timer"]] <- proc.time() - run_infos[[current_index]][["timer"]]
}
# if end not reached for this index, explore
if(!run_infos[[current_index]][["end_reached"]]){
# search list of ids
if(length(run_infos[[current_index]][["hits"]]) == 1) {
# complete args
args <- c(base_args, list(
index = current_index,
id = run_infos[[current_index]][["hits"]],
source_includes = selected_fields
))
# get data from ES (simple get)
raw <- do.call(elastic::docs_get, args) %>%
jsonlite::fromJSON(simplifyDataFrame = TRUE) %>%
(function(x){
if(keep_metadata) unlist(x, recursive = FALSE) else x[["_source"]]
}) %>%
rbind() %>%
as.data.frame(stringsAsFactors = FALSE) %>%
tibble::as_tibble() %>%
change_column_type()
# combne results in one df
final_df[[ current_index ]] <- raw
# end
run_infos[[current_index]][["end_reached"]] <- TRUE
# Stop the clock
run_infos[[current_index]][["timer"]] <- proc.time() - run_infos[[current_index]][["timer"]]
#
nb_hits <- 1
} else {
# ids to req
if(head && length(run_infos[[current_index]][["hits"]]) > self$head_search_size){
req_ids <- head(run_infos[[current_index]][["hits"]], self$head_search_size)
} else {
req_ids <- run_infos[[current_index]][["hits"]]
}
# complete args
args <- c(base_args, list(
index = current_index,
ids = req_ids,
"_source_includes" = selected_fields
))
# get data from ES
res <- do.call(elastic::docs_mget, args) %>%
jsonlite::fromJSON(simplifyDataFrame = TRUE)
# if the result is not empty
nb_hits <- nrow(res$docs)
if(nb_hits > 0){
# tidy data
raw <- get_clean_bulk(res$docs)
# combne results in one df
if(length(final_df[[ current_index ]]) == 0){
final_df[[ current_index ]] <- raw
} else {
final_df[[ current_index ]] <- tryCatch(
expr = {
# error raised with listed columns
dplyr::bind_rows(raw, final_df[[ current_index ]])
}, error = function(e){
# better management of listed columns
rbind(raw, final_df[[ current_index ]])
}
)
}
}
}
# update pbar
if(!purrr::is_null(pb)){
pb_sum <- pb_sum + nb_hits
setTxtProgressBar(pb, pb_sum)
}
# test end
if(nrow(final_df[[current_index]]) >= run_infos[[current_index]][["threshold"]]){
run_infos[[current_index]][["end_reached"]] <- TRUE
# Stop the clock
run_infos[[current_index]][["timer"]] <- proc.time() - run_infos[[current_index]][["timer"]]
} else {
# continue search scroll
search_res <- elastic::scroll(self$connection,
run_infos[[current_index]][["scroll_id"]],
time_scroll = scroll_timer)
tmp_ids <- extract_ids(search_res$hits$hits)
run_infos[[current_index]][["hits"]] <- tmp_ids
run_infos[[current_index]][["end_reached"]] <- (length(tmp_ids) == 0)
}
}
}
# test if all ends are set
all_end_search <- run_infos %>%
lapply(function(x){ x[["end_reached"]] }) %>%
unlist(use.names = FALSE) %>%
all()
}
# verbose time
if(self$verbose){
# CR after progress bar
message()
# has scroll pending?
message(" -> Closing scrolls: ")
for(current_index in names(final_df)){
# close scroll
run_infos[[current_index]][["scroll_id"]] %>%
elastic::scroll_clear(self$connection, x = ., all = FALSE) %>%
(function(x){ if(x) "closed" else "implicit closing pending" }) %>%
message(" - ", current_index, ": ", .)
}
# execution
message(" -> Execution time: ")
execution_total <- 0
for(current_index in names(final_df)){
# calculate
user_took <- run_infos[[current_index]][["timer"]][["elapsed"]] %>%
private$humanize_time()
message(" - ", current_index, ": ", user_took$time, user_took$unit)
# total
execution_total <- execution_total + run_infos[[current_index]][["timer"]][["elapsed"]]
}
total_took <- private$humanize_time(execution_total)
message(" - total: ", total_took$time, total_took$unit)
}
# close progress bar
if(!purrr::is_null(pb)) close(pb)
}
# return
if(self$quiet_results) invisible(final_df) else final_df
},
# --------------------------------------------------------------------------
# methods - Joins
# --------------------------------------------------------------------------
#' @details
#' Execute a inner join between two datasets using `dplyr` joins.
#' The datasets can be in-memory (variable name) or the name of an currently stored
#' Elasticsearch index. Joins cannot be done on column of type "list" ("by"
#' argument).
#'
#' @param ... see `join()` params.
#'
#' @family joins
#'
#' @examples
#' \dontrun{
#' # some data for joins examples
#' kc$push(ggplot2::diamonds, "diamonds")
#' # prepare join datasets, only big the biggest diamonds are selected (9)
#' sup_carat <- dplyr::filter(ggplot2::diamonds, carat > 3.5)
#' r <- kc$push(sup_carat, "diamonds_superior")
#' # execute a inner_join with one index and one in-memory dataset
#' kc$inner_join(ggplot2::diamonds, "diamonds_superior")
#' # execute a inner_join with one index queried, and one in-memory dataset
#' kc$inner_join(ggplot2::diamonds, "diamonds", right_query
#' = "carat:>3.5")
#' }
#'
#' @return a tibble
#'
inner_join = function(...) {
res <- private$join(join_type = "inner", ...)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Execute a full join between two datasets using `dplyr` joins.
#' The datasets can be in-memory (variable name) or the name of an currently stored
#' Elasticsearch index. Joins cannot be done on column of type "list" ("by"
#' argument).
#'
#' @param ... see `join()` params.
#'
#' @family joins
#'
#' @examples
#' \dontrun{
#' # prepare join datasets, fair cuts
#' fair_cut <- dplyr::filter(ggplot2::diamonds, cut == "Fair") # 1605 lines
#' sup_carat <- kc$pull("diamonds_superior")$diamonds_superior
#' # execute a full_join with one index and one in-memory dataset
#' kc$full_join(fair_cut, "diamonds_superior")
#' # execute a full_join with one index queried, and one in-memory dataset
#' kc$full_join(sup_carat, "diamonds", right_query = "cut:fair")
#' }
#'
#' @return a tibble
#'
full_join = function(...) {
res <- private$join(join_type = "full", ...)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Execute a left join between two datasets using `dplyr` joins.
#' The datasets can be in-memory (variable name) or the name of an currently stored
#' Elasticsearch index. Joins cannot be done on column of type "list" ("by"
#' argument).
#'
#' @param ... see `join()` params.
#'
#' @family joins
#'
#' @examples
#' \dontrun{
#' # prepare join datasets, fair cuts
#' fair_cut <- dplyr::filter(ggplot2::diamonds, cut == "Fair") # 1605 lines
#' sup_carat <- kc$pull("diamonds_superior")$diamonds_superior
#' # execute a left_join with one index and one in-memory dataset
#' kc$left_join(fair_cut, "diamonds_superior")
#' # execute a left_join with one index queried, and one in-memory dataset
#' kc$left_join(sup_carat, "diamonds", right_query
#' = "cut:fair")
#' }
#'
#' @return a tibble
#'
left_join = function(...) {
res <- private$join(join_type = "left", ...)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Execute a right join between two datasets using `dplyr` joins.
#' The datasets can be in-memory (variable name) or the name of an currently stored
#' Elasticsearch index. Joins cannot be done on column of type "list" ("by"
#' argument).
#'
#' @param ... see `join()` params.
#'
#' @family joins
#'
#' @examples
#' \dontrun{
#' # prepare join datasets, fair cuts
#' fair_cut <- dplyr::filter(ggplot2::diamonds, cut == "Fair") # 1605 lines
#' sup_carat <- kc$pull("diamonds_superior")$diamonds_superior
#' # execute a right_join with one index and one in-memory dataset
#' kc$right_join(fair_cut, "diamonds_superior")
#' # execute a right_join with one index queried, and one in-memory dataset
#' kc$right_join(sup_carat, "diamonds", right_query
#' = "cut:fair")
#' }
#'
#' @return a tibble
#'
right_join = function(...) {
res <- private$join(join_type = "right", ...)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Execute a semi join between two datasets using `dplyr` joins.
#' The datasets can be in-memory (variable name) or the name of an currently stored
#' Elasticsearch index. Joins cannot be done on column of type "list" ("by"
#' argument).
#'
#' @param ... see `join()` params.
#'
#' @family joins
#'
#' @examples
#' \dontrun{
#' # prepare join datasets, fair cuts
#' fair_cut <- dplyr::filter(ggplot2::diamonds, cut == "Fair") # 1605 lines
#' sup_carat <- kc$pull("diamonds_superior")$diamonds_superior
#' # execute a semi_join with one index and one in-memory dataset
#' kc$semi_join(fair_cut, "diamonds_superior")
#' # execute a semi_join with one index queried, and one in-memory dataset
#' kc$semi_join(sup_carat, "diamonds", right_query
#' = "cut:fair")
#' }
#'
#' @return a tibble
#'
semi_join = function(...) {
res <- private$join(join_type = "semi", ...)
if(self$quiet_results) invisible(res) else res
},
#' @details
#' Execute a anti join between two datasets using `dplyr` joins.
#' The datasets can be in-memory (variable name) or the name of an currently stored
#' Elasticsearch index. Joins cannot be done on column of type "list" ("by"
#' argument).
#'
#' @param ... see `join()` params.
#'
#' @family joins
#'
#' @examples
#' \dontrun{
#' # prepare join datasets, fair cuts
#' fair_cut <- dplyr::filter(ggplot2::diamonds, cut == "Fair") # 1605 lines
#' sup_carat <- kc$pull("diamonds_superior")$diamonds_superior
#' # execute a anti_join with one index and one in-memory dataset
#' kc$anti_join(fair_cut, "diamonds_superior")
#' # execute a anti_join with one index queried, and one in-memory dataset
#' kc$anti_join(sup_carat, "diamonds", right_query
#' = "cut:fair")
#' #
#' # Do not mind this, removing example indices
#' elastic::index_delete(kc$connection, "*")
#' kc <- NULL
#' }
#'
#' @return a tibble
#'
anti_join = function(...) {
res <- private$join(join_type = "anti", ...)
if(self$quiet_results) invisible(res) else res
}
)
)
# -------------------------------
# Kibior static methods
#
#'
#' @title Static - initiate a direct instance to Kibio public repository
#' @name Static - initiate a direct instance to Kibio public repository
#'
#' @details
#' Initiate a instance of Kibior connected to the Kibio public repository.
#'
#' @param verbose verbosity activation (default: FALSE)
#'
#' @family initiate
#'
#' @return a new instance of Kibior conencted to Kibio service
#'
Kibior$get_kibio_instance <- function(verbose = FALSE){
kibio_endpoint <- "kibio.compbio.ulaval.ca"
kibio_port <- 80L
message("Trying to connect to Kibio servers on '", kibio_endpoint, ":", kibio_port, "'")
tryCatch(expr = {
k <- Kibior$new(kibio_endpoint, kibio_port, verbose = verbose)
message("This instance grants you anonymous connection with read-only priviledges")
k
},
error = function(e){
e$message %>%
paste0("\nKibio servers seems not accessible.") %>%
paste0("\nPlease, try updating the KibioR package.") %>%
stop()
}
)
}
#'
#' @title Static - Kibior is instance
#' @name Static - Kibior is instance
#'
#' @details
#' Tests if a given object is a Kibior instance.
#' Basically compute symmetric difference between two sets of class.
#'
#' @param obj an object
#'
#' @family comparison
#'
#' @return TRUE if the given object is an instance of Kibior, else FALSE
#'
Kibior$is_instance <- function(obj){
is(obj, Kibior$classname)
}
#'
#' @title Static - Tests if packages are installed
#' @name Static - Tests if packages are installed
#'
#' @details
#' Get the installation status of some package names (installed TRUE/FALSE).
#'
#' @param pkg_names a vector of some package names
#'
#' @family install
#'
#' @return a named vector of packages with installation status
#'
Kibior$.is_installed <- function(pkg_names){
if(!purrr::is_character(pkg_names)) stop("Need package names.")
is_installed <- pkg_names %in% installed.packages()[,"Package"]
names(is_installed) <- pkg_names
is_installed
}
# -------------------------------
# Kibior operators
#
# allow to attach in the current env the "==" and "!=" operators for "KibiorOperators" derived
# classes
#'
#' @title Kibior equals operator
#' @name Kibior equals operator
#'
#' @details
#' Call kibioR `$eq()` for a comparison of the two instances.
#'
#' @param x the first Kibior instance
#' @param y the second Kibior instance
#'
#' @family comparison
#'
#' @return TRUE if the two instances are equals, else FALSE
#'
`==.KibiorOperators` = function(x, y){ x$eq(y) }
#'
#' @title Kibior not-equals operator
#' @name Kibior not-equals operator
#'
#' @details
#' Call kibioR `$ne()` for a comparison of the two instances.
#'
#' @param x the first Kibior instance
#' @param y the second Kibior instance
#'
#' @family comparison
#'
#' @return TRUE if the two instances are differents, else FALSE
#'
`!=.KibiorOperators` = function(x, y){ x$ne(y) }
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.