Created
September 16, 2013 16:41
-
-
Save rob-p/6583164 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdio> | |
#include <cstdlib> | |
#include <vector> | |
#include <thread> | |
#include <atomic> | |
#include <iostream> | |
#include "unistd.h" | |
#include "fcntl.h" | |
extern "C" { | |
#include "kseq.h" | |
} | |
#include <boost/filesystem.hpp> | |
#include <boost/range/irange.hpp> | |
#include "tbb/concurrent_queue.h" | |
KSEQ_INIT(int, read) | |
namespace bfs = boost::filesystem; | |
struct ReadSeq { | |
char* seq = nullptr; | |
size_t len = 0; | |
}; | |
class StreamingReadParser { | |
public: | |
StreamingReadParser( std::vector<bfs::path>& files ): inputStreams_(files), | |
parsing_(false), parsingThread_(nullptr) | |
{ | |
readStructs_ = new ReadSeq[queueCapacity_]; | |
readQueue_.set_capacity(queueCapacity_); | |
seqContainerQueue_.set_capacity(queueCapacity_); | |
for (size_t i = 0; i < queueCapacity_; ++i) { | |
seqContainerQueue_.push(&readStructs_[i]); | |
} | |
} | |
~StreamingReadParser() { | |
parsingThread_->join(); | |
for (auto i : boost::irange(size_t{0}, queueCapacity_)) { | |
if (readStructs_[i].seq != nullptr) { free(readStructs_[i].seq); } | |
} | |
delete [] readStructs_; | |
delete parsingThread_; | |
} | |
bool start() { | |
if (!parsing_) { | |
parsing_ = true; | |
parsingThread_ = new std::thread([this](){ | |
kseq_t* seq; | |
ReadSeq* s; | |
std::cerr << "reading from " << this->inputStreams_.size() << " streams\n"; | |
for (auto file : this->inputStreams_) { | |
std::cerr << "reading from " << file.native() << "\n"; | |
// open the file and init the parser | |
int fp = open(file.c_str(), O_RDONLY); | |
seq = kseq_init(fp); | |
while (kseq_read(seq) >= 0) { | |
//puts(seq->seq.s); | |
this->seqContainerQueue_.pop(s); | |
if (seq->seq.l > s->len) { | |
s->seq = static_cast<char*>(realloc(s->seq, seq->seq.l)); | |
} | |
//std::cerr << "old len = " << s->len << ", "; | |
s->len = seq->seq.l; | |
//std::cerr << "new len = " << s->len << "\n"; | |
memcpy(s->seq, seq->seq.s, s->len); | |
//puts(s->seq); | |
this->readQueue_.push(s); | |
} | |
// destroy the parser and close the file | |
kseq_destroy(seq); | |
close(fp); | |
} | |
this->parsing_ = false; | |
}); | |
//std::swap(tmpThread, parsingThread_); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
//tbb::concurrent_bounded_queue<ReadSeq*>& readQueue() { return readQueue_; } | |
inline bool nextRead(ReadSeq*& seq) { | |
while(parsing_) { | |
if (readQueue_.try_pop(seq)) { return true; } | |
} | |
return false; | |
} | |
inline void finishedWithRead(ReadSeq*& s) { seqContainerQueue_.push(s); } | |
private: | |
std::vector<bfs::path>& inputStreams_; | |
bool parsing_; | |
std::thread* parsingThread_; | |
tbb::concurrent_bounded_queue<ReadSeq*> readQueue_, seqContainerQueue_; | |
ReadSeq* readStructs_; | |
const size_t queueCapacity_ = 2000000; | |
}; | |
int main(int argc, char* argv[]) { | |
/* | |
kseq_t *seq; | |
int fp = open(argv[1], O_RDONLY); | |
seq = kseq_init(fp); | |
tbb::concurrent_bounded_queue<ReadSeq*> seqContainerQueue; | |
tbb::concurrent_bounded_queue<ReadSeq*> readQueue; | |
size_t capacity = 2000000; | |
ReadSeq* readStructs = new ReadSeq[capacity]; | |
seqContainerQueue.set_capacity(capacity); | |
readQueue.set_capacity(capacity); | |
for (size_t i = 0; i < capacity; ++i) { | |
seqContainerQueue.push(&readStructs[i]); | |
} | |
bool done{false}; | |
auto readThread = std::thread([&seqContainerQueue, &readQueue, &seq, &done]() { | |
ReadSeq* s; | |
while (kseq_read(seq) >= 0) { | |
seqContainerQueue.pop(s); | |
if (seq->seq.l > s->len) { | |
s->seq = static_cast<char*>(realloc(s->seq, seq->seq.l)); | |
} | |
s->len = seq->seq.l; | |
memcpy(s->seq, seq->seq.s, s->len); | |
//puts(seq->seq.s); | |
readQueue.push(s); | |
} | |
done = true; | |
}); | |
*/ | |
std::vector<bfs::path> paths; | |
for (size_t i = 1; i < argc; ++i) { | |
std::cerr << "stream " << i << " is " << argv[i]; | |
paths.emplace_back(argv[i]); | |
} | |
StreamingReadParser parser(paths); | |
parser.start(); | |
std::atomic<size_t> numProcessed{0}; | |
size_t numThreads = 3; | |
std::vector<std::thread> threads; | |
for (size_t t=0; t < numThreads; ++t) { | |
threads.emplace_back([&parser, &numProcessed](){ | |
ReadSeq* s; | |
while( parser.nextRead(s) ) { | |
size_t np = numProcessed++; | |
if (np % 100000 == 0) { std::cerr << "processed " << np << " reads\n";} | |
//printf("(%.90s)", s->seq); | |
//puts(s->seq); | |
parser.finishedWithRead(s); | |
} | |
}); | |
} | |
for (auto& t : threads) { t.join(); } | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment