Created
December 9, 2016 22:46
-
-
Save shanemhansen/879b6c87c3b8edd5dc8258039fd58e14 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <fstream> | |
#include "concurrentqueue/blockingconcurrentqueue.h" | |
#include "glob.h" | |
#include <boost/iostreams/filtering_stream.hpp> | |
#include <boost/iostreams/copy.hpp> | |
#include <boost/iostreams/filter/zlib.hpp> | |
using namespace std; | |
void runQuery1(const string&); | |
int main() { | |
string data_dir = "out"; | |
runQuery1(data_dir); | |
} | |
inline vector<string> glob(const string& pat){ | |
using namespace std; | |
glob_t glob_result; | |
glob(pat.c_str(),GLOB_TILDE,NULL,&glob_result); | |
vector<string> ret; | |
for(unsigned int i=0;i<glob_result.gl_pathc;++i){ | |
ret.push_back(string(glob_result.gl_pathv[i])); | |
} | |
globfree(&glob_result); | |
return ret; | |
} | |
struct ResultRow { | |
string URL; | |
string Ranking; | |
}; | |
void runQuery1(const string& data_dir) { | |
auto path = data_dir + "/rankings/*deflate"; | |
auto fnames = glob(path); | |
std::atomic<int> workersWorking(fnames.size()); | |
moodycamel::BlockingConcurrentQueue<ResultRow> outbuf(1024); | |
auto t = std::thread([&] { | |
std::ios_base::sync_with_stdio(false); | |
ResultRow item; | |
while(workersWorking.load()!=0) { | |
outbuf.wait_dequeue(item); | |
cout << item.URL << ',' << item.Ranking << '\n'; | |
} | |
}); | |
vector<thread> wg(0); | |
for(auto fname: fnames) { | |
wg.push_back(std::thread([&](string fname) { | |
ifstream f(fname, ios_base::in | ios_base::binary); | |
boost::iostreams::filtering_istream zrdr; | |
zrdr.push(boost::iostreams::zlib_decompressor()); | |
zrdr.push(f); | |
string line; | |
int commandIdx = 0; | |
int rankingIdx = 0; | |
ResultRow item; | |
while(getline(zrdr, line)) { | |
if((commandIdx = line.find(',')) == -1) { | |
continue; | |
} | |
item.URL = line.substr(0, commandIdx-1); | |
if((rankingIdx = line.find(commandIdx, ',')) == -1) { | |
continue; | |
} | |
item.Ranking= line.substr(commandIdx+1, rankingIdx-commandIdx); | |
int i=0; | |
for(auto d: item.Ranking) { | |
i*=10; | |
i+=(d-'0'); | |
} | |
if (i<70) { | |
continue; | |
} | |
outbuf.enqueue(item); | |
} | |
}, fname)); | |
} | |
for(auto& t: wg) { | |
cout << "join\n"; | |
t.join(); | |
} | |
t.join(); | |
cout.flush(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"flag" | |
"github.com/klauspost/compress/zlib" | |
"log" | |
"os" | |
"path/filepath" | |
"sync" | |
) | |
var datadir = flag.String("data-dir", "./out", "directory containing dataset") | |
var query = flag.Int("query", 1, "which benchmark to run") | |
// this runs one of the big data benchmarks | |
func main() { | |
flag.Parse() | |
if *datadir == "" { | |
log.Fatal("data dir required") | |
} | |
switch *query { | |
case 1: | |
runQuery1() | |
} | |
} | |
func runQuery1() error { | |
fnames, err := filepath.Glob(filepath.Join(*datadir, "rankings", "*deflate")) | |
if err != nil { | |
return err | |
} | |
outbuf := make(chan [2][]byte, 1024) | |
var wg sync.WaitGroup | |
go func() { | |
out := bufio.NewWriter(os.Stdout) | |
defer out.Flush() | |
for buf := range outbuf { | |
out.Write(buf[0]) | |
out.WriteByte(',') | |
out.Write(buf[1]) | |
out.WriteByte('\n') | |
} | |
}() | |
for _, fname := range fnames { | |
log.Println(fname) | |
wg.Add(1) | |
go func(fname string) { | |
defer wg.Done() | |
f, err := os.Open(fname) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
defer f.Close() | |
zrdr, err := zlib.NewReader(f) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
defer zrdr.Close() | |
scanner := bufio.NewScanner(zrdr) | |
var commandIdx int | |
var currentBuf []byte | |
var pair [2][]byte | |
for scanner.Scan() { | |
currentBuf = scanner.Bytes() | |
if commandIdx = bytes.IndexByte(currentBuf, ','); commandIdx == -1 { | |
log.Println("bad data") | |
break | |
} | |
pair[0] = makecopy(currentBuf[:commandIdx-1]) | |
currentBuf = currentBuf[commandIdx+1:] | |
if commandIdx = bytes.IndexByte(currentBuf, ','); commandIdx == -1 { | |
log.Println("bad data") | |
break | |
} | |
i := 0 | |
tmp := makecopy(currentBuf[:commandIdx-1]) | |
for _, d := range tmp { | |
i *= 10 | |
i += int(d - '0') | |
} | |
if i < 70 { | |
continue | |
} | |
pair[1] = tmp | |
outbuf <- pair | |
} | |
}(fname) | |
} | |
wg.Wait() | |
close(outbuf) | |
return nil | |
} | |
func makecopy(buf []byte) []byte { | |
out := make([]byte, len(buf)) | |
copy(out, buf) | |
return out | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment