Skip to content

Instantly share code, notes, and snippets.

@dmarcelinobr
Last active December 30, 2015 15:25
Show Gist options
  • Save dmarcelinobr/fe376f84ca8cc1f87854 to your computer and use it in GitHub Desktop.
Save dmarcelinobr/fe376f84ca8cc1f87854 to your computer and use it in GitHub Desktop.
Distributed processing using data.table
library(Rserve)
library(data.table)
port = 9411:9414 # we have 4 ports designed to run cluster, so up to 4 nodes
dt = data.table(time_year = 2012:2014) # simulate input data from which we dynamically derive partitions by year
partitioning = quote(time_year)
partitions = dt[,unique(eval(as.name(partitioning)))] # extract all partitions from reference dataset
names(partitions) = partitions
port = port[seq_along(partitions)]
names(port) = as.character(partitions)
run_cluster = lapply(partitions, function(partition){
Rserve(debug = FALSE, port = port[as.character(partition)], args = c("--save"))
}) # run up to 4 nodes
library(RSclient)
rscl = lapply(partitions, function(partition){
RS.connect(port = port[as.character(partition)])
})
load_pkgs = lapply(partitions, function(partition){
RS.eval(rsc = rscl[[as.character(partition)]], x = library(data.table))
})
n = 1e7
populate_data = lapply(partitions, function(partition){
qcall = substitute({
x = data.table(biggroup = sample(n*0.9, n, TRUE), tinygroup = sample(1e3, n, TRUE), partition = rep(partition, n, TRUE))
invisible(TRUE)
}, list(n = n, partition = partition))
RS.eval(rsc = rscl[[as.character(partition)]], x = qcall, lazy = FALSE)
})
(summary_data = lapply(partitions, function(partition){
RS.eval(rsc = rscl[[as.character(partition)]], x = summary(x))
}))
make_index = lapply(partitions, function(partition){
RS.eval(rsc = rscl[[as.character(partition)]], x = {set2keyv(x, "biggroup"); invisible(TRUE)})
})
set.seed(123)
biggroup_filter = sample(n*0.9, 10)
use_index = lapply(partitions, function(partition){
qcall = substitute({
op = options(datatable.auto.index=TRUE, datatable.verbose=TRUE)
prnt = capture.output(r <- x[biggroup %in% biggroup_filter])
options(op)
list(prnt, r)
}, list(biggroup_filter = biggroup_filter))
RS.eval(rsc = rscl[[as.character(partition)]], x = qcall, lazy=FALSE)
})
# lets confirm
lapply(lapply(use_index, `[[`, 1L), cat, sep="\n")
print(biggroup_filter)
rbindlist(lapply(use_index, `[[`, 2L))
sapply(partitions, function(partition){
RS.eval(rsc = rscl[[as.character(partition)]], nrow(x))
})
library(microbenchmarkCore) # install.packages("microbenchmarkCore", repos="https://olafmersmann.github.io/drat")
set.seed(456)
biggroup_filter = sample(n*0.9, 10)
system.nanotime(use_index <- lapply(partitions, function(partition){
qcall = substitute({
op = options(datatable.auto.index=TRUE, datatable.verbose=TRUE)
prnt = capture.output(r <- x[biggroup %in% biggroup_filter])
options(op)
list(prnt, r)
}, list(biggroup_filter = biggroup_filter))
RS.eval(rsc = rscl[[as.character(partition)]], x = qcall, lazy=FALSE)
}))
# user system elapsed
# NA NA 0.003457923
rbindlist(lapply(use_index, `[[`, 2L))[, head(.SD, 2L)]
# biggroup tinygroup partition
# 1: 1894611 818 2012
# 2: 741895 62 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment