Last active
September 7, 2022 08:33
-
-
Save samuelsaari/ebad34fbcd6b9109192c84c1fde54272 to your computer and use it in GitHub Desktop.
Parallel wrapper for stepFlexmix
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Parallel wrapper test for stepFlexmix | |
# Miika Maki | |
# R-code | |
Sys.time() | |
rm(list=ls()) | |
# NB! Set interactive to TRUE or FALSE | |
INTERACTIVE=F | |
if (Sys.info()[["user"]]=="makimiik"){ | |
.libPaths(c("/projappl/project_2003758/project_rpackages", .libPaths())) | |
print('----makimiik using this script-----') | |
# Library paths | |
if (grepl("\\d+$",getwd(),fixed=F)) { | |
print('this is a bash session') | |
INTERACTIVE=F | |
} else { | |
print('this is an interactive session') | |
INTERACTIVE=T | |
} | |
} | |
libpath <- .libPaths()[1] | |
# ---------------------------------------------------------------------------------------------- | |
# libraries | |
libraries_cran <- c( | |
"flexmix", # GMM | |
"lattice", # do not know where, but keeps loading in the middle of the code | |
"tictoc", # simple timing | |
"furrr", #parallel purr | |
"future", #might be needed with furrr | |
"parallel", # might be needed with furrr | |
"doSNOW", #parallell loops" | |
"snow", # for multinode analysis | |
"glue", # python-like f-string printing | |
"tidyverse", | |
"microbenchmark") | |
# Install packages if not yet installed | |
installed_libraries_cran <- libraries_cran %in% rownames(installed.packages()) | |
if (any(installed_libraries_cran == FALSE)) { | |
install.packages(libraries_cran[!installed_libraries_cran],lib = libpath) | |
} | |
# loading libraries | |
lapply(libraries_cran, require, character = TRUE) | |
#------------------------------------------------------------------------------------------------- | |
# options | |
options(future.availableCores.methods = "Slurm") # for parallell computing | |
print(availableCores()) | |
#------------------------------------------------------------------------------------------------- | |
# Creating panel_data with growth patterns | |
#------------------------------------------------------------------------------------------------- | |
# parameters for stepFlexmix | |
NESTED_LOOP <- T # OR FUTURE_MAP | |
TEST_RUN <- F | |
RUN_ALL_IN_ONE_FLEXMIX <- F | |
set.seed(101) | |
if (TEST_RUN) { | |
nr_of_classes <- 5 # 2 # 10 | |
nr_of_classes_vector <- 1:nr_of_classes | |
nrep=4 # 2 # 10 | |
sleep_time <- 1/100 | |
print('----running small----') | |
} else { | |
#panel_data <- panel_data %>% filter(mergeid %in% sample(unique(mergeid),600)) | |
nr_of_classes <- 12 # 12 | |
nr_of_classes_vector <- 1:nr_of_classes | |
nrep=10 # 40 | |
sleep_time <- 10 | |
print('----running big------') | |
} | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
########################################################################################## | |
#------------------------------------------------------------------------------------------------- | |
# stepFlexmix | |
flexmix_single <- function(k,nrep) { | |
start_time <- lubridate::now() | |
benchmark <- microbenchmark::microbenchmark(Sys.sleep(sleep_time*k), times=nrep) | |
time_in_seconds <- as.character(benchmark$time*10^(-9)) | |
end_time <- lubridate::now() | |
duration <- end_time-start_time | |
start_time_f <- hms::as_hms(start_time) | |
end_time_f <- hms::as_hms(end_time) | |
duration_f <- format(round(duration,2), nsmall=2) | |
#print(glue("{k}: * . Started: {start_time_f}. Ended: {end_time_f}. Duration: {duration_f}. \n")) | |
return(glue::glue("{k}: * . Started: {start_time_f}. Ended: {end_time_f}. Duration: {duration_f}. \n")) | |
} | |
#print(sprintf("%s: * . Started: %s. Ended: %s. Duration: %s. \n", 1, 2, 3, 4)) | |
######################################################### | |
# Running models | |
tic(msg="Nested_flexmix") | |
#----plan | |
if (INTERACTIVE) { | |
plan(list(tweak(multisession, workers = nr_of_classes), tweak(multisession, workers = nrep))) | |
} else { | |
##--------Setup for MPI--------------- | |
cl <- parallel::makeCluster(workers=nr_of_classes, type = "MPI") #toimii | |
#print(cl) | |
# avail.workers <- future::nbrOfWorkers() | |
# cl <- makeCluster(workers = avail.workers, type = "MPI") | |
if (NESTED_LOOP) { | |
# OPTION 0 (for nested loop) | |
registerDoSNOW(cl) | |
} else { | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# OPTION 1 | |
#plan(cluster, workers = cl) | |
# OPTION 2 | |
#https://stackoverflow.com/q/61506909/7147695 | |
# future::plan( | |
# list( | |
# future::tweak( | |
# future::multiprocess, | |
# workers = nr_of_classes), | |
# future::tweak( | |
# future::multiprocess, | |
# workers = nrep) | |
# ) | |
# ) | |
# OPTION 3 | |
plan(list(tweak(cluster, workers =cl), multisession)) | |
} | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
##------------ Displaying info-------------------- | |
print('--------------Worker process allocation among nodes--------------------') | |
worker_allocation <- clusterCall(cl, function() Sys.info()[c("nodename", "machine")]) | |
print(worker_allocation) | |
print(glue(".....Available cores: {availableCores()}.....")) | |
print(glue(".....nbrOfWorkers: {future::nbrOfWorkers()}.....")) | |
print('-----------------------------') | |
} | |
##----------- The gist----------------------- | |
if (NESTED_LOOP) { | |
foreach(i = nr_of_classes_vector) %:% | |
foreach(j = 1:nrep) %dopar% flexmix_single(i,j) | |
} else { | |
future_map(1:nr_of_classes, .options = furrr_options(seed = TRUE),function(i) | |
future_map(1:nrep, function(j) | |
flexmix_single(i,j), .options = furrr_options(seed = TRUE))) | |
} | |
##------------- stopping parallelization------------- | |
if(!INTERACTIVE){stopCluster(cl)} | |
print('---classes----') | |
print('------') | |
toc(log = TRUE) | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
############################################################################## | |
#................... | |
Sys.time() | |
print(availableCores()) | |
(log_of_stepFlexmix_times <- tic.log(format = T)) | |
tic.clearlog() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment