Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SDModels
Title: Spectrally Deconfounded Models
Version: 2.0.1
Version: 2.0.2
Authors@R: c(
person("Markus", "Ulmer", email = "markus.ulmer@stat.math.ethz.ch",
role = c("aut", "cre", "cph"), comment = c(ORCID = "0000-0001-7783-8475")),
Expand All @@ -16,12 +16,13 @@ Imports:
ggraph,
gridExtra,
parallel,
pbapply,
Rdpack,
tidyr,
fda,
grplasso,
rlang
rlang,
progressr,
parallelly
Suggests:
plotly,
datasets,
Expand All @@ -31,6 +32,7 @@ Suggests:
ranger,
HDclassif,
qpdf,
cli,
testthat (>= 3.0.0)
RdMacros: Rdpack
Encoding: UTF-8
Expand Down
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# SDModels 2.0.2

* Switch all the parallelization to futures. See `vignette("Runtime")`
* Switch all the progress updates to progressr. Progress updates are now also available for parallel processing and are customizable.
* Process are much more RAM efficient now.

# SDModels 2.0.1

* Fix bug in SDTree and SDForest where an error occurred, if X had columns with only one unique value.
Expand Down
85 changes: 62 additions & 23 deletions R/SDAM.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
#' Default is \code{TRUE} to not reduce the signal of high variance covariates.
#' @param ind_lin A vector of indices specifying which covariates to model linearly (i.e. not expanded into basis function).
#' Default is `NULL`.
#' @param mc.cores Number of cores to use for parallel processing, if \code{mc.cores > 1}
#' the cross validation is parallelized. Default is `1`. (only supported for unix)
#' @param verbose If \code{TRUE} fitting information is shown.
#' @param mc.cores Number of cores to use for parallel computation `vignette("Runtime")`.
#' The `future` package is used for parallel processing.
#' To use custom processing plans mc.cores has to be <= 1, see [`future` package](https://future.futureverse.org/).
#' @param verbose If \code{TRUE} progress updates are shown using the `progressr` package.
#' To customize the progress bar, see [`progressr` package](https://progressr.futureverse.org/articles/progressr-intro.html)
#' @param notRegularized A vector of indices specifying which covariates not to regularize.
#' Default is `NULL`.
#' @return An object of class `SDAM` containing the following elements:
Expand Down Expand Up @@ -98,7 +100,8 @@
#' # predict
#' predict(model, newdata = wine[42, ])
#'
#' ## alternative function call
#' ## alternative function call with customized progress bar
#' progressr::handlers(progressr::handler_txtprogressbar(char = cli::col_red(cli::symbol$heart)))
#' mod_none <- SDAM(x = as.matrix(wine[1:10, -c(1, 2)]), y = wine$alcohol[1:10],
#' Q_type = "no_deconfounding", nfolds = 2, n_K = 4,
#' n_lambda1 = 4, n_lambda2 = 8)
Expand Down Expand Up @@ -156,8 +159,15 @@ SDAM <- function(formula = NULL, data = NULL, x = NULL, y = NULL,
n_unique_X <- apply(X, 2, function(x){length(unique(x))})

# Generate the design and model parameters for every K in vK
lmodK <- list()
for (i in 1:length(vK)){
progressr::with_progress({
pr <- progressr::progressor(along = 1:(n_K), enable = verbose)
pr(sprintf("Design generation"), amount = 0, class = "sticky")
if(mc.cores > 1){
plan <- if (parallelly::supportsMulticore()) "multicore" else "multisession"
with(future::plan(plan, workers = min(mc.cores, n_K)), local = TRUE)
}

lmodK <- future.apply::future_lapply(future.seed = TRUE, 1:length(vK), function(i){
K <- vK[i]
# effective number of basis functions for each Xj, j = 1,..., p
# K_eff[j] can be at most equal to the number of unique values of Xj
Expand Down Expand Up @@ -213,9 +223,11 @@ SDAM <- function(formula = NULL, data = NULL, x = NULL, y = NULL,
lambda <- rep(0, n_lambda1)
index <- rep(1, length(index))
}
lmodK[[i]] <- list(Rlist = Rlist, lbreaks = lbreaks, index = index, B = B,
QB = QB, lambda = lambda, K = K, K_eff = K_eff)
}
pr()
list(Rlist = Rlist, lbreaks = lbreaks, index = index,
QB = QB, lambda = lambda, K = K, K_eff = K_eff)
})
})

# generate folds for CV
ind <- sample(rep(1:nfolds, length.out = n), replace = FALSE)
Expand All @@ -236,20 +248,34 @@ SDAM <- function(formula = NULL, data = NULL, x = NULL, y = NULL,

QYpred <- predict(mod, newdata = listK$QB[test, ])
mse <- apply(QYpred, 2, function(y){mean((y - QY[test])^2)})
pr()
return(mse)
}

mse_fold <- function(l){
MSEl <- lapply(lmodK, function(listK){mse_fold_K(l, listK)})
MSEl <- future.apply::future_lapply(future.seed = TRUE, lmodK,
mse_fold_K,
l = l)
return(unname(do.call(rbind, MSEl)))
}

if(verbose) print("Initial cross-validation")
if(mc.cores == 1){
MSES <- pbapply::pblapply(1:nfolds, mse_fold)
} else {
MSES <- parallel::mclapply(1:nfolds, mse_fold, mc.cores = mc.cores)
}
#use random generator that works with multiprocessing
ok <- RNGkind("L'Ecuyer-CMRG")
progressr::with_progress({
pr <- progressr::progressor(along = 1:(nfolds * n_K), enable = verbose)
pr(sprintf("Initial cross-validation"), amount = 0, class = "sticky")
if(mc.cores > 1){
plan <- if (parallelly::supportsMulticore()) "multicore" else "multisession"
with(future::plan(plan, workers = min(mc.cores, nfolds)), local = TRUE)
}
MSES <- lapply(X = 1:nfolds, mse_fold)
})

#if(mc.cores == 1){
# MSES <- pbapply::pblapply(1:nfolds, mse_fold)
#} else {
# MSES <- parallel::mclapply(1:nfolds, mse_fold, mc.cores = mc.cores)
#}

# aggregate MSEs over folds
MSES.agg <- Reduce("+", MSES) / nfolds
Expand All @@ -267,13 +293,25 @@ SDAM <- function(formula = NULL, data = NULL, x = NULL, y = NULL,
length.out = n_lambda2))
}

if(verbose) print("Second stage cross-validation")
if(mc.cores == 1){
MSES1 <- pbapply::pblapply(1:nfolds, mse_fold_K, listK = modK.min)
} else {
MSES1 <- parallel::mclapply(1:nfolds, mse_fold_K, listK = modK.min,
mc.cores = mc.cores)
}
progressr::with_progress({
pr <- progressr::progressor(along = 1:nfolds, enable = verbose)
pr(sprintf("Second stage cross-validation"), amount = 0, class = "sticky")
if(mc.cores > 1){
plan <- if (parallelly::supportsMulticore()) "multicore" else "multisession"
with(future::plan(plan, workers = min(mc.cores, nfolds)), local = TRUE)
}
MSES1 <- future.apply::future_lapply(future.seed = TRUE,
X = 1:nfolds,
mse_fold_K,
listK = modK.min)
})
#if(verbose) print("Second stage cross-validation")
#if(mc.cores == 1){
# MSES1 <- pbapply::pblapply(1:nfolds, mse_fold_K, listK = modK.min)
#} else {
# MSES1 <- parallel::mclapply(1:nfolds, mse_fold_K, listK = modK.min,
# mc.cores = mc.cores)
#}

MSES1 <- do.call(rbind, MSES1)
MSE1.agg <- apply(MSES1, 2, mean)
Expand Down Expand Up @@ -339,6 +377,7 @@ SDAM <- function(formula = NULL, data = NULL, x = NULL, y = NULL,
# estimated active set
lreturn$active <- active
class(lreturn) <- "SDAM"
RNGkind(ok[1])
return(lreturn)
}

Expand Down
57 changes: 27 additions & 30 deletions R/SDForest.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
#' @param mtry Number of randomly selected covariates to consider for a split,
#' if \code{NULL} half of the covariates are available for each split.
#' \eqn{\text{mtry} = \lfloor \frac{p}{2} \rfloor}
#' @param mc.cores Number of cores to use for parallel processing,
#' if \code{mc.cores > 1} the trees are estimated in parallel.
#' @param mc.cores Number of cores to use for parallel computation `vignette("Runtime")`.
#' The `future` package is used for parallel processing.
#' To use custom processing plans mc.cores has to be <= 1, see [`future` package](https://future.futureverse.org/).
#' @param Q_type Type of deconfounding, one of 'trim', 'pca', 'no_deconfounding'.
#' 'trim' corresponds to the Trim transform \insertCite{Cevid2020SpectralModels}{SDModels}
#' as implemented in the Doubly debiased lasso \insertCite{Guo2022DoublyConfounding}{SDModels},
Expand Down Expand Up @@ -67,7 +68,8 @@
#' @param Q_scale Should data be scaled to estimate the spectral transformation?
#' Default is \code{TRUE} to not reduce the signal of high variance covariates,
#' and we do not know of a scenario where this hurts.
#' @param verbose If \code{TRUE} fitting information is shown.
#' @param verbose If \code{TRUE} progress updates are shown using the `progressr` package.
#' To customize the progress bar, see [`progressr` package](https://progressr.futureverse.org/articles/progressr-intro.html)
#' @param predictors Subset of colnames(X) or numerical indices of the covariates
#' for which an effect on y should be estimated. All the other covariates are only
#' used for deconfounding.
Expand Down Expand Up @@ -127,6 +129,8 @@
#' # comparison to classical random forest
#' fit_ranger <- ranger::ranger(Y ~ ., train_data, importance = 'impurity')
#'
#' # you can customize the progress bar see parameter verbose
#' progressr::handlers("cli")
#' fit <- SDForest(x = X, y = Y, nTree = 100, Q_type = 'pca', q_hat = 2)
#' fit <- SDForest(Y ~ ., nTree = 100, train_data)
#' fit
Expand All @@ -137,6 +141,7 @@
#' plot(fit)
#'
#' # a few more might be helpfull
#' progressr::handlers(progressr::handler_txtprogressbar(char = cli::col_red(cli::symbol$heart)))
#' fit2 <- SDForest(Y ~ ., nTree = 50, train_data)
#' fit <- mergeForest(fit, fit2)
#'
Expand Down Expand Up @@ -276,52 +281,43 @@ SDForest <- function(formula = NULL, data = NULL, x = NULL, y = NULL, nTree = 10
ind <- do.call(c, ind)
}

#use random generater that works with multiprocessing
#use random generator that works with multiprocessing
ok <- RNGkind("L'Ecuyer-CMRG")

# Worker wrapper for bagged trees
worker_fun <- function(i) {
Xi <- matrix(X[i, ], ncol = ncol(X))
colnames(Xi) <- colnames(X)
if(!is.null(A)){
Ai <- matrix(A[i, ], ncol = ncol(A))
}else{
Ai <- NULL
}

# protect SDTree call
res_i <- tryCatch({
tree_obj <- SDTree(x = Xi, y = Y[i],
cp = cp, min_sample = min_sample,
tree_obj <- estimate_tree(X = X, Y = Y, Qf = NULL,
cp = cp, min_sample = min_sample, max_leaves = n,
Q_type = Q_type, trim_quantile = trim_quantile,
q_hat = q_hat, mtry = mtry, A = Ai, gamma = gamma,
max_candidates = max_candidates,
Q_scale = Q_scale, predictors = predictors)
q_hat = q_hat, mtry = mtry, A = A, gamma = gamma,
max_candidates = max_candidates, fast = TRUE,
Q_scale = Q_scale, predictors = predictors,
boot_index = i)

list(ok = TRUE, tree = tree_obj)
}, error = function(e) {
list(ok = FALSE, error = conditionMessage(e))
}, warning = function(w) {
# convert warnings to tagged results if needed
list(ok = TRUE, tree = NULL, warning = conditionMessage(w))
})
p()

res_i
}

progressr::with_progress({
p <- progressr::progressor(along = ind, enable = verbose)
if(mc.cores > 1){
if(Sys.info()[["sysname"]] == "Linux"){
if(verbose) print('mclapply')
res_list <- parallel::mclapply(ind, worker_fun, mc.cores = mc.cores)
}else{
if(verbose) print('future')
future::plan('multisession', workers = mc.cores)
res_list <- future.apply::future_lapply(future.seed = TRUE, X = ind, worker_fun)
}
}else{
res_list <- pbapply::pblapply(ind, worker_fun)
plan <- if (parallelly::supportsMulticore()) "multicore" else "multisession"
with(future::plan(plan, workers = mc.cores), local = TRUE)
}
RNGkind(ok[1])
res_list <- future.apply::future_lapply(future.seed = TRUE, X = ind, worker_fun)
})

#check worker statuses
# check worker statuses
failed_workers <- which(vapply(res_list, function(z) !isTRUE(z$ok), logical(1)))
if (length(failed_workers) > 0) {
stop(sprintf("SDForest: %d worker(s) failed, first error: %s",
Expand Down Expand Up @@ -437,6 +433,7 @@ SDForest <- function(formula = NULL, data = NULL, x = NULL, y = NULL, nTree = 10
output$ooEnv_predictions <- ooEnv_predictions
}

RNGkind(ok[1])
class(output) <- 'SDForest'
output
}
Loading
Loading