Created
July 24, 2018 01:56
-
-
Save wush978/f8823a6aef81a7bc08b5a29caf5236ff to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <Rcpp.h> | |
using namespace Rcpp; | |
//[[Rcpp::export(".time_clustering")]] | |
SEXP time_clustering(DataFrame x, int time_threshold, double ratio_threshold) { | |
std::vector<int> result_i, result_k; | |
IntegerVector tunit(wrap(x["tunit"])); | |
std::vector< LogicalVector > targets; | |
for(int i = 1;i < x.ncol();i++) { | |
targets.push_back(LogicalVector(wrap(x[i]))); | |
} | |
double threshold = time_threshold * ratio_threshold; | |
std::vector<double> count(targets.size(), 0); | |
for(int i = 0;i < tunit.size();i++) { | |
const int current_t(tunit[i]); | |
std::fill(count.begin(), count.end(), 0.0); | |
for(int j = i;j < tunit.size();j++) { | |
if (tunit[j] - current_t >= time_threshold) break; | |
for(int k = 0;k < targets.size();k++) { | |
if (targets[k][j]) count[k] += 1; | |
} | |
} | |
for(int k = 0;k < count.size();k++) { | |
if (count[k] > threshold) { | |
result_i.push_back(i + 1); | |
result_k.push_back(k + 1); | |
} | |
} | |
} | |
return List::create( | |
wrap(result_i), | |
wrap(result_k) | |
); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(magrittr) | |
library(RSQLite) | |
library(parallel) | |
library(dplyr) | |
library(data.table) | |
.tbs <- dbListTables(db) %>% | |
grep(pattern = "^cem_", value = TRUE) | |
cl <- makeCluster(length(.tbs)) | |
init.cluster <- function(cl) { | |
parallel:::checkCluster(cl) | |
clusterEvalQ(cl, { | |
library(magrittr) | |
library(RSQLite) | |
library(parallel) | |
library(dplyr) | |
library(data.table) | |
NULL | |
}) | |
invisible(NULL) | |
} | |
init.cluster(cl) | |
clusterExport(cl, "db") | |
clusterEvalQ(cl, { | |
db <- dbConnect(db) | |
NULL | |
}) | |
# parLapply(cl, .tbs, function(.tb) { | |
# assign("tb.name", .tb, envir = globalenv()) | |
# . <- strsplit(tb.name, "_")[[1]][2] | |
# year <- substring(., 1, 4) | |
# month <- substring(., 5, 6) | |
# . <- dbReadTable(db, .tb) | |
# . <- mutate(., time = sprintf("%s-%s-%02d %02d:00:00", year, month, day, hour) %>% as.POSIXct(tz = "Asia/Taipei")) | |
# . <- mutate(., weekday = format(time, "%w") %>% as.integer()) | |
# . <- mutate(., key = sprintf("%d-%d-%d-%d", long, lat, weekday, hour)) | |
# . <- data.table(.) | |
# setkeyv(., "key") | |
# assign("tb", ., envir = globalenv()) | |
# NULL | |
# }) | |
tb <- lapply(.tbs, function(.tb) { | |
. <- strsplit(.tb, "_")[[1]][2] | |
year <- substring(., 1, 4) | |
month <- substring(., 5, 6) | |
. <- dbReadTable(db, .tb) | |
. <- mutate(., time = sprintf("%s-%s-%02d %02d:00:00", year, month, day, hour) %>% as.POSIXct(tz = "Asia/Taipei")) | |
. <- mutate(., weekday = format(time, "%w") %>% as.integer()) | |
. <- mutate(., key = sprintf("%d-%d-%d-%d", long, lat, weekday, hour)) | |
}) %>% | |
rbindlist() | |
tb.key <- clusterEvalQ(cl, { | |
unique(tb$key) | |
}) %>% | |
do.call(what = c) %>% | |
unique() | |
tb.area <- strsplit(tb.key, "-") %>% | |
parLapply(cl = cl, head, 2) %>% | |
parSapply(cl = cl, paste, collapse = "-") | |
tb.ka <- data.frame(key = tb.key, area = tb.area) | |
tb.ka <- data.table(tb.ka, key = "area") | |
n <- unique(tb.area) %>% length() | |
gr.count <- 8 | |
tb.area.list <- rep(1:gr.count, floor(n / gr.count)) %>% | |
`length<-`(n) %>% | |
sample() %>% | |
split(x = unique(tb.area)) | |
tb.ka.list <- lapply(tb.area.list, function(.a) { | |
tb.ka[J(.a),nomatch = 0] | |
}) %>% | |
lapply(as.data.frame) | |
clusterExport(cl, "tb.ka.list") | |
clusterEvalQ(cl, { | |
tb.gr <- lapply(tb.ka.list, function(.k) { | |
tb[J(.k),nomatch=0] | |
}) | |
NULL | |
}) | |
dir.create(".cem.gr") | |
lapply(1:gr.count, function(.) dir.create(file.path(".cem.gr", .))) | |
clusterExport(cl, "gr.count") | |
clusterEvalQ(cl, { | |
for(i in 1:gr.count) { | |
saveRDS(tb.gr[[i]], file.path(".cem.gr", i, sprintf("%s.Rds", tb.name))) | |
} | |
NULL | |
}) | |
stopCluster(cl) | |
rm(tb.ka, tb.gr, tb.lnglat, tb.area, tb.area.list, tb.gr.list, tb.ka.list, tb.key, tb.key.list);gc() | |
cl <- makeCluster(gr.count) | |
init.cluster(cl) | |
parLapply(cl, seq_len(gr.count), function(.i) { | |
.path <- dir(file.path(".cem.gr", .i), full.names = TRUE) | |
tb <- lapply(.path, readRDS) %>% | |
rbindlist() | |
assign("tb", tb, envir = globalenv()) | |
gc() | |
NULL | |
}) %>% | |
invisible() | |
# generate input | |
input.threshold <- as.POSIXct("2018-05-20 00:00:00") | |
clusterExport(cl, "input.threshold") | |
clusterEvalQ(cl, { | |
tb.input <- filter(tb, time >= input.threshold) %>% | |
data.table(key = "key") | |
tb.history <- filter(tb, time < input.threshold) %>% | |
data.table(key = "key") | |
rm(tb) | |
gc() | |
NULL | |
}) %>% | |
invisible() | |
target <- colnames(tb) %>% | |
grep(pattern = "count|traffic", value = TRUE) | |
clusterExport(cl, "target") | |
clusterEvalQ(cl, { | |
tb.history.avg <- tb.history[,lapply(.SD, mean), by = key, .SDcols = target] | |
colnames(tb.history.avg) <- c("key", sprintf("%s.avg", colnames(tb.history.avg) %>% tail(-1))) | |
tb.history.sd <- tb.history[,lapply(.SD, sd), by = key, .SDcols = target] | |
colnames(tb.history.sd) <- c("key", sprintf("%s.sd", colnames(tb.history.sd) %>% tail(-1))) | |
rm(tb.history);gc() | |
NULL | |
}) %>% | |
invisible() | |
clusterEvalQ(cl, { | |
tb.input.avg <- tb.history.avg[tb.input] | |
tb.input.avg.sd <- tb.history.sd[tb.input.avg] | |
# rm(tb.input.avg);gc() | |
NULL | |
}) %>% | |
invisible() | |
clusterEvalQ(cl, { | |
result <- data.frame( | |
key = tb.input.avg.sd$key, | |
time = tb.input.avg.sd$time, | |
long = tb.input.avg.sd$long, | |
lat = tb.input.avg.sd$lat, | |
weekday = tb.input.avg.sd$weekday, | |
hour = tb.input.avg.sd$hour, | |
stringsAsFactors = FALSE | |
) %>% | |
data.table(key = "key") | |
for(name in target) { | |
name.avg <- sprintf("%s.avg", name) | |
name.sd <- sprintf("%s.sd", name) | |
.value <- tb.input.avg.sd[[name]] | |
.upper <- tb.input.avg.sd[[name.avg]] + 3 * tb.input.avg.sd[[name.sd]] | |
.lower <- tb.input.avg.sd[[name.avg]] - 3 * tb.input.avg.sd[[name.sd]] | |
.is_alert <- sign((.value - .upper) * (.value - .lower)) != -1 | |
name.result <- sprintf("%s.is_alert", name) | |
result[[name.result]] <- .is_alert | |
} | |
gc() | |
}) %>% | |
invisible() | |
parLapply(cl, 1:gr.count, function(.i) { | |
saveRDS(result, file.path(".cem.gr", .i, "result.Rds")) | |
NULL | |
}) %>% | |
invisible() | |
stopCluster(cl) | |
name.is_alert <- sprintf("%s.is_alert", target) | |
result$alert_ratio <- lapply(name.is_alert, function(x) result[[x]]) %>% | |
Reduce(f = `+`) %>% | |
`/`(length(name.is_alert)) | |
alert_ratio.threshold <- .1 | |
result.filtered <- filter(result, alert_ratio > alert_ratio.threshold) | |
# time based clustering | |
time.threshold <- 24 | |
time_ratio.threshold <- .8 | |
result.time <- group_by(result.filtered, long, lat) %>% | |
do({ | |
.df <- . | |
if (nrow(.df) < time.threshold) data.frame() else { | |
.df <- arrange(.df, time) | |
.x <- cbind( | |
tunit = difftime(.df$time , .df$time[1], units = "hour") %>% as.integer(), | |
.df[,name.is_alert] | |
) | |
.i <- .time_clustering(.x, time.threshold, time_ratio.threshold) | |
if (length(.i[[1]]) == 0) data.frame() else { | |
.k <- .i[[2]] | |
.i <- .i[[1]] | |
.r <- .df[unique(.i),] | |
.i <- match(.i, unique(.i)) | |
.r[,name.is_alert][] <- FALSE | |
for(.l in seq_along(.i)) { | |
.r[[name.is_alert[.k[.l]]]][.i[.l]] <- TRUE | |
} | |
.r$alert_ratio <- NULL | |
.r | |
} | |
} | |
}) | |
# space clustering | |
library(dbscan) | |
result.time$tunit <- difftime(result.time$time, min(result.time$time), units = "hour") %>% | |
as.numeric() | |
g <- select(result.time, long, lat, tunit) %>% | |
as.matrix() %>% | |
dbscan(eps = 10, minPts = 5) | |
## -- dev | |
function() { | |
tb <- clusterEvalQ(cl, head(tb)) %>% do.call(what = rbind) | |
clusterEvalQ(cl, head(tb.input)) | |
clusterEvalQ(cl, rm(tb)) | |
clusterEvalQ(cl, gc());gc() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment