Skip to content

Instantly share code, notes, and snippets.

@samuelsaari
Last active September 7, 2022 08:33
Show Gist options
  • Save samuelsaari/ebad34fbcd6b9109192c84c1fde54272 to your computer and use it in GitHub Desktop.
Save samuelsaari/ebad34fbcd6b9109192c84c1fde54272 to your computer and use it in GitHub Desktop.
Parallel wrapper for stepFlexmix
# Parallel wrapper test for stepFlexmix
# Miika Maki
# R-code
Sys.time()
rm(list=ls())
# NB! Set interactive to TRUE or FALSE
INTERACTIVE=F
if (Sys.info()[["user"]]=="makimiik"){
.libPaths(c("/projappl/project_2003758/project_rpackages", .libPaths()))
print('----makimiik using this script-----')
# Library paths
if (grepl("\\d+$",getwd(),fixed=F)) {
print('this is a bash session')
INTERACTIVE=F
} else {
print('this is an interactive session')
INTERACTIVE=T
}
}
libpath <- .libPaths()[1]
# ----------------------------------------------------------------------------------------------
# libraries
libraries_cran <- c(
"flexmix", # GMM
"lattice", # do not know where, but keeps loading in the middle of the code
"tictoc", # simple timing
"furrr", #parallel purr
"future", #might be needed with furrr
"parallel", # might be needed with furrr
"doSNOW", #parallell loops"
"snow", # for multinode analysis
"glue", # python-like f-string printing
"tidyverse",
"microbenchmark")
# Install packages if not yet installed
installed_libraries_cran <- libraries_cran %in% rownames(installed.packages())
if (any(installed_libraries_cran == FALSE)) {
install.packages(libraries_cran[!installed_libraries_cran],lib = libpath)
}
# loading libraries
lapply(libraries_cran, require, character = TRUE)
#-------------------------------------------------------------------------------------------------
# options
options(future.availableCores.methods = "Slurm") # for parallell computing
print(availableCores())
#-------------------------------------------------------------------------------------------------
# Creating panel_data with growth patterns
#-------------------------------------------------------------------------------------------------
# parameters for stepFlexmix
NESTED_LOOP <- T # OR FUTURE_MAP
TEST_RUN <- F
RUN_ALL_IN_ONE_FLEXMIX <- F
set.seed(101)
if (TEST_RUN) {
nr_of_classes <- 5 # 2 # 10
nr_of_classes_vector <- 1:nr_of_classes
nrep=4 # 2 # 10
sleep_time <- 1/100
print('----running small----')
} else {
#panel_data <- panel_data %>% filter(mergeid %in% sample(unique(mergeid),600))
nr_of_classes <- 12 # 12
nr_of_classes_vector <- 1:nr_of_classes
nrep=10 # 40
sleep_time <- 10
print('----running big------')
}
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
##########################################################################################
#-------------------------------------------------------------------------------------------------
# stepFlexmix
flexmix_single <- function(k,nrep) {
start_time <- lubridate::now()
benchmark <- microbenchmark::microbenchmark(Sys.sleep(sleep_time*k), times=nrep)
time_in_seconds <- as.character(benchmark$time*10^(-9))
end_time <- lubridate::now()
duration <- end_time-start_time
start_time_f <- hms::as_hms(start_time)
end_time_f <- hms::as_hms(end_time)
duration_f <- format(round(duration,2), nsmall=2)
#print(glue("{k}: * . Started: {start_time_f}. Ended: {end_time_f}. Duration: {duration_f}. \n"))
return(glue::glue("{k}: * . Started: {start_time_f}. Ended: {end_time_f}. Duration: {duration_f}. \n"))
}
#print(sprintf("%s: * . Started: %s. Ended: %s. Duration: %s. \n", 1, 2, 3, 4))
#########################################################
# Running models
tic(msg="Nested_flexmix")
#----plan
if (INTERACTIVE) {
plan(list(tweak(multisession, workers = nr_of_classes), tweak(multisession, workers = nrep)))
} else {
##--------Setup for MPI---------------
cl <- parallel::makeCluster(workers=nr_of_classes, type = "MPI") #toimii
#print(cl)
# avail.workers <- future::nbrOfWorkers()
# cl <- makeCluster(workers = avail.workers, type = "MPI")
if (NESTED_LOOP) {
# OPTION 0 (for nested loop)
registerDoSNOW(cl)
} else {
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# OPTION 1
#plan(cluster, workers = cl)
# OPTION 2
#https://stackoverflow.com/q/61506909/7147695
# future::plan(
# list(
# future::tweak(
# future::multiprocess,
# workers = nr_of_classes),
# future::tweak(
# future::multiprocess,
# workers = nrep)
# )
# )
# OPTION 3
plan(list(tweak(cluster, workers =cl), multisession))
}
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
##------------ Displaying info--------------------
print('--------------Worker process allocation among nodes--------------------')
worker_allocation <- clusterCall(cl, function() Sys.info()[c("nodename", "machine")])
print(worker_allocation)
print(glue(".....Available cores: {availableCores()}....."))
print(glue(".....nbrOfWorkers: {future::nbrOfWorkers()}....."))
print('-----------------------------')
}
##----------- The gist-----------------------
if (NESTED_LOOP) {
foreach(i = nr_of_classes_vector) %:%
foreach(j = 1:nrep) %dopar% flexmix_single(i,j)
} else {
future_map(1:nr_of_classes, .options = furrr_options(seed = TRUE),function(i)
future_map(1:nrep, function(j)
flexmix_single(i,j), .options = furrr_options(seed = TRUE)))
}
##------------- stopping parallelization-------------
if(!INTERACTIVE){stopCluster(cl)}
print('---classes----')
print('------')
toc(log = TRUE)
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
#...................
Sys.time()
print(availableCores())
(log_of_stepFlexmix_times <- tic.log(format = T))
tic.clearlog()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment