Last active
December 15, 2015 08:29
-
-
Save Xachriel/5231092 to your computer and use it in GitHub Desktop.
FM + data.table = success ?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Using 64bit R 2.15.3 | |
#Packages we are going to use | |
#download the packed data here: https://www.dropbox.com/s/c1rtcryp8p6500w/kms-2013-01-06-s.json.bz2 | |
library(rjson) | |
library(data.table) | |
library(snow) | |
library(rbenchmark) | |
#set your WD where your unpackaged file is, or add path. | |
#Reading only partial data for this test, you can naturally do as you wish | |
eve = readLines("kms-2013-01-06-s.json", n=80000) | |
#json => data.frames (data lines) | |
eve = matrix(eve, ncol=1) | |
eve.list = apply(eve, 1, list) | |
#First and last lines are useless | |
eve.list = eve.list[-1] | |
#eve.list = eve.list[-length(eve.list)] #didn't read the last line in this case | |
#Handling json... | |
eve.list = lapply(eve.list, function(x){ | |
x = unlist(x) | |
return(fromJSON(x)) | |
}) | |
#Handling data.frame | |
eve.data.frame = function(eve.list){ | |
framo = unlist(eve.list) | |
dups = unique(names(framo)[duplicated(names(framo))]) | |
if(length(dups) > 0){ | |
for(i in seq(dups)){ | |
bol = names(framo) %in% dups[i] | |
framo[unique(dups[i])] = paste(framo[bol], collapse= ",") | |
} | |
framo = framo[!duplicated(names(framo))] | |
} | |
return(as.matrix(t(framo))) | |
} | |
#This takes awhile if you read the whole data | |
eve.frames = lapply(eve.list, eve.data.frame)[1:60000] | |
#data.frames to data.tables | |
eve.frames2 = lapply(eve.frames, as.data.table) | |
#unique(sapply(eve.frames2, class)) #data.frame data.tables | |
## | |
#Functions | |
## | |
#Standard FM | |
fast.merging = function(data.list, nparts){ | |
if(!is.list(data.list)) stop("data.list isn't a list") | |
while(length(data.list) != 1){ #Loop until everything is merged | |
if(length(data.list) > nparts){ | |
starts = seq(1, length(data.list), nparts) | |
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. | |
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even | |
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) | |
sections = apply(sections, 1, list) | |
}else{ | |
sections = list(c(1, length(data.list))) | |
} | |
#We have the standard way inside lapply | |
data.list = lapply(sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
#the standard way starts -> | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) | |
} | |
#<- standard way ends | |
return(part) | |
}, data.list = data.list) | |
} | |
return(data.list[[1]]) #returning the merged data frame | |
} | |
#MC FM | |
mc.fast.merging = function(data.list, nparts, cluster){ | |
if(!is.list(data.list)) stop("data.list isn't a list") | |
while(length(data.list) != 1){ #Loop until everything is merged | |
if(length(data.list) > nparts){ | |
starts = seq(1, length(data.list), nparts) | |
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. | |
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even | |
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) | |
sections = apply(sections, 1, list) | |
}else{ | |
sections = list(c(1, length(data.list))) | |
} | |
if(length(sections) !=1){ | |
data.list = parLapply(cluster, sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
#the standard way starts -> | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) | |
} | |
#<- standard way ends | |
return(part) | |
}, data.list = data.list) | |
}else{ | |
data.list = lapply(sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) | |
} | |
return(part) | |
}, data.list = data.list) | |
} | |
} | |
return(data.list[[1]]) #returning the merged data frame | |
} | |
#Here's the lady | |
LadyMergAlot = function(data.list, nparts, cluster){ | |
if(!is.list(data.list)) stop("data.list isn't a list") | |
while(length(data.list) != 1){ #Loop until everything is merged | |
if(length(data.list) > nparts){ | |
starts = seq(1, length(data.list), nparts) | |
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. | |
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even | |
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) | |
sections = apply(sections, 1, list) | |
}else{ | |
sections = list(c(1, length(data.list))) | |
} | |
if(length(sections) !=1){ | |
data.list = parLapply(cluster, sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
#the standard way starts -> | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], | |
by=intersect(names(part), names(data.list[[i]])), | |
all=TRUE, sort=FALSE) | |
} | |
#<- standard way ends | |
return(part) | |
}, data.list = data.list) | |
}else{ | |
data.list = lapply(sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], | |
by=intersect(names(part), names(data.list[[i]])), | |
all=TRUE, sort=FALSE) | |
} | |
return(part) | |
}, data.list = data.list) | |
} | |
} | |
return(data.list[[1]]) #returning the merged data frame | |
} | |
## | |
#Here starts the bench. | |
## | |
sekvenssi = seq(2, 5002, 100) | |
sekvenssi2 = seq(2, 50002, 1000) | |
#Without data.table | |
time1 = rep(NA, length(sekvenssi)) | |
for(i in 1:length(sekvenssi)){ | |
complete = eve.frames[[1]] | |
time1[i] = system.time( | |
for(j in 2:sekvenssi[i]){ | |
complete = merge(complete, eve.frames[[j]], all=TRUE, sort=FALSE) | |
} | |
)[3] | |
print(paste(round(time1[i],2), nrow(complete),i)) | |
} | |
write(time1, "time1.dat") | |
#With data.table | |
time2 = rep(NA, length(sekvenssi)) | |
for(i in 1:length(sekvenssi)){ | |
complete = eve.frames2[[1]] | |
time2[i] = system.time( | |
for(j in 2:sekvenssi[i]){ | |
complete = merge(complete, eve.frames2[[j]], by=intersect(names(complete), names(eve.frames2[[j]])),all=TRUE, sort=FALSE) | |
} | |
)[3] | |
print(paste(round(time2[i],2), nrow(complete),i)) | |
} | |
write(time2, "time2.dat") | |
#FM with data.frames | |
time3 = rep(NA, length(sekvenssi2)) | |
for(i in 1:length(sekvenssi2)){ | |
complete = eve.frames[1:sekvenssi2[i]] | |
time3[i] = system.time(fast.merging(complete, 10))[3] | |
print(c(i, time3[i])) | |
if(time3[i] > 400) stop() | |
} | |
write(time3, "time3.dat") | |
#MCFM with data.frames | |
cl = makeCluster(4, type="SOCK") | |
time4 = rep(NA, length(sekvenssi2)) | |
for(i in seq(sekvenssi2)){ | |
complete = eve.frames[1:sekvenssi2[i]] | |
time4[i] = system.time(mc.fast.merging(complete, 10, cl))[3] | |
print(c(i, time4[i])) | |
if(time4[i] > 400) stop() | |
} | |
stopCluster(cl) | |
write(time4, "time4.dat") | |
#MCFM with data.tables | |
cl = makeCluster(4, type="SOCK") | |
time5 = rep(NA, length(sekvenssi2)) | |
for(i in seq(sekvenssi2)){ | |
complete = eve.frames2[1:sekvenssi2[i]] | |
time5[i] = system.time(LadyMergAlot(complete, 10, cl))[3] | |
print(c(i, time5[i])) | |
if(time5[i] > 400) stop() | |
} | |
stopCluster(cl) | |
write(time5, "time5.dat") | |
time1 = scan("time1.dat") | |
time2 = scan("time2.dat") | |
time3 = scan("time3.dat") | |
time4 = scan("time4.dat") | |
time5 = scan("time5.dat") | |
png("mark3.png", width=960) | |
plot(sekvenssi, time1/60, type="l", xlim=c(0,50000), ylim=c(0,5), | |
ylab="Elapsed time (minutes)", xlab="Frames merged") | |
points(sekvenssi, time2/60, type="l", col="blue") | |
points(sekvenssi2, time3/60, type="l", col="red") | |
points(sekvenssi2, time4/60, type="l", col="pink") | |
points(sekvenssi2, time5/60, type="l", col="green") | |
legend("bottomright", c("R standard", "data.table", "FM 1 cores", "FM 4 cores","The Lady"), | |
lty=c(1,1,1,1), col=c("black", "blue", "red", "pink", "green")) | |
dev.off() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment