Created
January 9, 2014 13:54
-
-
Save romainfrancois/8334411 to your computer and use it in GitHub Desktop.
Simple implementation of which using threads in C++11
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> bench(1000) | |
Unit: microseconds | |
expr min lq median uq max neval | |
which(b) 2.104 4.548 5.1000 6.0955 18.306 100 | |
which_cpp_nthreads(b, 2) 26.401 28.303 28.9285 30.0625 103.980 100 | |
which_cpp_nthreads(b, 4) 45.902 49.494 50.6335 52.1155 115.184 100 | |
which_cpp_nthreads(b, 8) 97.596 100.143 101.3720 104.9895 270.521 100 | |
> bench(1e+05) | |
Unit: microseconds | |
expr min lq median uq max neval | |
which(b) 387.247 400.1215 418.9635 664.8670 17795.71 100 | |
which_cpp_nthreads(b, 2) 280.149 315.4075 337.2355 437.7310 27336.24 100 | |
which_cpp_nthreads(b, 4) 226.678 264.1730 285.7150 342.5565 17886.84 100 | |
which_cpp_nthreads(b, 8) 280.099 310.4090 350.5835 428.6745 17681.50 100 | |
> bench(1e+07) | |
Unit: milliseconds | |
expr min lq median uq max neval | |
which(b) 42.74872 43.91283 49.14603 54.41899 89.68374 100 | |
which_cpp_nthreads(b, 2) 24.07307 28.57485 31.69920 39.01933 64.15361 100 | |
which_cpp_nthreads(b, 4) 14.64242 21.81248 23.60718 29.60597 71.86710 100 | |
which_cpp_nthreads(b, 8) 13.52122 17.26611 20.50603 29.84738 54.59843 100 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <Rcpp.h> | |
#include <thread> | |
#include <future> | |
using namespace Rcpp; | |
int process( int* begin, int* end, int index, int* out){ | |
int count = 0 ; | |
while(begin < end){ | |
index++ ; | |
if(*begin) out[count++] = index ; | |
begin++; | |
} | |
return count ; | |
} | |
typedef std::packaged_task< int(int*, int*, int, int*) > Task ; | |
//[[Rcpp::export]] | |
IntegerVector which_cpp_nthreads(LogicalVector b, int nthreads){ | |
int n = b.size() ; | |
std::vector<std::future<int>> futures(nthreads-1) ; | |
std::vector<std::thread> threads(nthreads-1) ; | |
std::vector<IntegerVector> chunks(nthreads-1) ; | |
int chunk_size = n / nthreads ; | |
for( int i=0; i<nthreads-1; i++){ | |
chunks[i] = IntegerVector(no_init(chunk_size)) ; | |
} | |
int* it = b.begin() ; | |
int pos = 0 ; | |
for( int i=0; i<nthreads-1; i++){ | |
Task task( &process ) ; | |
futures[i] = task.get_future(); | |
threads[i] = std::thread( std::move(task), it, it+chunk_size, pos, chunks[i].begin() ) ; | |
pos += chunk_size ; | |
it += chunk_size ; | |
} | |
int last_chunk_size = n-(nthreads-1)*chunk_size ; | |
IntegerVector last_chunk = no_init(last_chunk_size) ; | |
int n0 = process( it, b.end(), pos, last_chunk.begin() ) ; | |
int m = n0 ; | |
std::vector<int> sizes( nthreads-1) ; | |
for( int i=0; i<nthreads-1; i++) { | |
threads[i].join() ; | |
sizes[i] = futures[i].get() ; | |
m += sizes[i] ; | |
} | |
IntegerVector res = no_init(m); | |
int *p = res.begin() ; | |
for( int i=0; i<nthreads-1; i++){ | |
std::copy( chunks[i].begin(), chunks[i].begin() + sizes[i], p ) ; | |
p += sizes[i] ; | |
} | |
std::copy( last_chunk.begin(), last_chunk.begin() + n0, p ) ; | |
return res ; | |
} | |
/*** R | |
library(microbenchmark) | |
bench <- function(n){ | |
b <- sample(c(T,F),n,replace=T) | |
microbenchmark( | |
which(b), | |
which_cpp_nthreads(b, 2), | |
which_cpp_nthreads(b, 4), | |
which_cpp_nthreads(b, 8) | |
) | |
} | |
bench(1000) | |
bench(100000) | |
bench(10000000) | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment