Skip to content

Instantly share code, notes, and snippets.

@Xachriel
Last active December 15, 2015 08:29
Show Gist options
  • Save Xachriel/5231092 to your computer and use it in GitHub Desktop.
Save Xachriel/5231092 to your computer and use it in GitHub Desktop.
FM + data.table = success ?
#Using 64bit R 2.15.3
#Packages we are going to use
#download the packed data here: https://www.dropbox.com/s/c1rtcryp8p6500w/kms-2013-01-06-s.json.bz2
library(rjson)
library(data.table)
library(snow)
library(rbenchmark)
#set your WD where your unpackaged file is, or add path.
#Reading only partial data for this test, you can naturally do as you wish
eve = readLines("kms-2013-01-06-s.json", n=80000)
#json => data.frames (data lines)
eve = matrix(eve, ncol=1)
eve.list = apply(eve, 1, list)
#First and last lines are useless
eve.list = eve.list[-1]
#eve.list = eve.list[-length(eve.list)] #didn't read the last line in this case
#Handling json...
eve.list = lapply(eve.list, function(x){
x = unlist(x)
return(fromJSON(x))
})
#Handling data.frame
eve.data.frame = function(eve.list){
framo = unlist(eve.list)
dups = unique(names(framo)[duplicated(names(framo))])
if(length(dups) > 0){
for(i in seq(dups)){
bol = names(framo) %in% dups[i]
framo[unique(dups[i])] = paste(framo[bol], collapse= ",")
}
framo = framo[!duplicated(names(framo))]
}
return(as.matrix(t(framo)))
}
#This takes awhile if you read the whole data
eve.frames = lapply(eve.list, eve.data.frame)[1:60000]
#data.frames to data.tables
eve.frames2 = lapply(eve.frames, as.data.table)
#unique(sapply(eve.frames2, class)) #data.frame data.tables
##
#Functions
##
#Standard FM
fast.merging = function(data.list, nparts){
if(!is.list(data.list)) stop("data.list isn't a list")
while(length(data.list) != 1){ #Loop until everything is merged
if(length(data.list) > nparts){
starts = seq(1, length(data.list), nparts)
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts.
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE)
sections = apply(sections, 1, list)
}else{
sections = list(c(1, length(data.list)))
}
#We have the standard way inside lapply
data.list = lapply(sections, function(x, data.list){
if(is.list(x)) x = x[[1]]
#the standard way starts ->
part = data.list[[x[1]]]
for(i in x[1]:x[2]){
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE)
}
#<- standard way ends
return(part)
}, data.list = data.list)
}
return(data.list[[1]]) #returning the merged data frame
}
#MC FM
mc.fast.merging = function(data.list, nparts, cluster){
if(!is.list(data.list)) stop("data.list isn't a list")
while(length(data.list) != 1){ #Loop until everything is merged
if(length(data.list) > nparts){
starts = seq(1, length(data.list), nparts)
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts.
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE)
sections = apply(sections, 1, list)
}else{
sections = list(c(1, length(data.list)))
}
if(length(sections) !=1){
data.list = parLapply(cluster, sections, function(x, data.list){
if(is.list(x)) x = x[[1]]
#the standard way starts ->
part = data.list[[x[1]]]
for(i in x[1]:x[2]){
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE)
}
#<- standard way ends
return(part)
}, data.list = data.list)
}else{
data.list = lapply(sections, function(x, data.list){
if(is.list(x)) x = x[[1]]
part = data.list[[x[1]]]
for(i in x[1]:x[2]){
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE)
}
return(part)
}, data.list = data.list)
}
}
return(data.list[[1]]) #returning the merged data frame
}
#Here's the lady
LadyMergAlot = function(data.list, nparts, cluster){
if(!is.list(data.list)) stop("data.list isn't a list")
while(length(data.list) != 1){ #Loop until everything is merged
if(length(data.list) > nparts){
starts = seq(1, length(data.list), nparts)
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts.
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE)
sections = apply(sections, 1, list)
}else{
sections = list(c(1, length(data.list)))
}
if(length(sections) !=1){
data.list = parLapply(cluster, sections, function(x, data.list){
if(is.list(x)) x = x[[1]]
#the standard way starts ->
part = data.list[[x[1]]]
for(i in x[1]:x[2]){
part = merge(part, data.list[[i]],
by=intersect(names(part), names(data.list[[i]])),
all=TRUE, sort=FALSE)
}
#<- standard way ends
return(part)
}, data.list = data.list)
}else{
data.list = lapply(sections, function(x, data.list){
if(is.list(x)) x = x[[1]]
part = data.list[[x[1]]]
for(i in x[1]:x[2]){
part = merge(part, data.list[[i]],
by=intersect(names(part), names(data.list[[i]])),
all=TRUE, sort=FALSE)
}
return(part)
}, data.list = data.list)
}
}
return(data.list[[1]]) #returning the merged data frame
}
##
#Here starts the bench.
##
sekvenssi = seq(2, 5002, 100)
sekvenssi2 = seq(2, 50002, 1000)
#Without data.table
time1 = rep(NA, length(sekvenssi))
for(i in 1:length(sekvenssi)){
complete = eve.frames[[1]]
time1[i] = system.time(
for(j in 2:sekvenssi[i]){
complete = merge(complete, eve.frames[[j]], all=TRUE, sort=FALSE)
}
)[3]
print(paste(round(time1[i],2), nrow(complete),i))
}
write(time1, "time1.dat")
#With data.table
time2 = rep(NA, length(sekvenssi))
for(i in 1:length(sekvenssi)){
complete = eve.frames2[[1]]
time2[i] = system.time(
for(j in 2:sekvenssi[i]){
complete = merge(complete, eve.frames2[[j]], by=intersect(names(complete), names(eve.frames2[[j]])),all=TRUE, sort=FALSE)
}
)[3]
print(paste(round(time2[i],2), nrow(complete),i))
}
write(time2, "time2.dat")
#FM with data.frames
time3 = rep(NA, length(sekvenssi2))
for(i in 1:length(sekvenssi2)){
complete = eve.frames[1:sekvenssi2[i]]
time3[i] = system.time(fast.merging(complete, 10))[3]
print(c(i, time3[i]))
if(time3[i] > 400) stop()
}
write(time3, "time3.dat")
#MCFM with data.frames
cl = makeCluster(4, type="SOCK")
time4 = rep(NA, length(sekvenssi2))
for(i in seq(sekvenssi2)){
complete = eve.frames[1:sekvenssi2[i]]
time4[i] = system.time(mc.fast.merging(complete, 10, cl))[3]
print(c(i, time4[i]))
if(time4[i] > 400) stop()
}
stopCluster(cl)
write(time4, "time4.dat")
#MCFM with data.tables
cl = makeCluster(4, type="SOCK")
time5 = rep(NA, length(sekvenssi2))
for(i in seq(sekvenssi2)){
complete = eve.frames2[1:sekvenssi2[i]]
time5[i] = system.time(LadyMergAlot(complete, 10, cl))[3]
print(c(i, time5[i]))
if(time5[i] > 400) stop()
}
stopCluster(cl)
write(time5, "time5.dat")
time1 = scan("time1.dat")
time2 = scan("time2.dat")
time3 = scan("time3.dat")
time4 = scan("time4.dat")
time5 = scan("time5.dat")
png("mark3.png", width=960)
plot(sekvenssi, time1/60, type="l", xlim=c(0,50000), ylim=c(0,5),
ylab="Elapsed time (minutes)", xlab="Frames merged")
points(sekvenssi, time2/60, type="l", col="blue")
points(sekvenssi2, time3/60, type="l", col="red")
points(sekvenssi2, time4/60, type="l", col="pink")
points(sekvenssi2, time5/60, type="l", col="green")
legend("bottomright", c("R standard", "data.table", "FM 1 cores", "FM 4 cores","The Lady"),
lty=c(1,1,1,1), col=c("black", "blue", "red", "pink", "green"))
dev.off()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment