-
-
Save anguyen8/b9884b2fa121e7afc01b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* given a list of input leveldb's, merge them into a single output leveldb | |
* | |
*/ | |
#include <iostream> | |
#include <algorithm> | |
#include <boost/program_options.hpp> | |
#include <boost/cstdint.hpp> | |
#include <leveldb/db.h> | |
#include <leveldb/write_batch.h> | |
using namespace std; | |
using namespace boost; | |
namespace po = boost::program_options; | |
struct leveldb_iterator // leveldb and iterator | |
{ | |
leveldb_iterator(const string& path) | |
: db_(NULL), it_(NULL) | |
{ | |
leveldb::Options options; | |
options.create_if_missing = false; | |
leveldb::Status s = leveldb::DB::Open(options, path, &db_); | |
if (!s.ok()) | |
throw runtime_error(s.ToString()); | |
it_ = db_->NewIterator(leveldb::ReadOptions()); | |
it_->SeekToFirst(); | |
} | |
bool next() | |
{ | |
it_->Next(); | |
if (!it_->Valid()) | |
{ | |
delete it_; | |
delete db_; | |
return false; | |
} | |
return true; | |
} | |
bool operator < (const leveldb_iterator & rhs) const | |
{ | |
return it_->key().compare(rhs.it_->key()) < 0; | |
} | |
leveldb::DB * db_; | |
leveldb::Iterator* it_; | |
}; | |
void run(vector<leveldb_iterator> & leveldb_iterators, leveldb::DB * out) | |
{ | |
leveldb::WriteOptions w; | |
leveldb::WriteBatch batch; | |
for (int64_t batch_size = 1; !leveldb_iterators.empty(); ++batch_size) { | |
if (batch_size % 1000 == 0){ | |
// write previous batch out | |
out->Write(w, &batch); | |
batch.Clear(); | |
} | |
// write to batch | |
batch.Put(leveldb_iterators.front().it_->key(), leveldb_iterators.front().it_->value()); | |
pop_heap(leveldb_iterators.begin(), leveldb_iterators.end()); | |
if (leveldb_iterators.back().next()) | |
push_heap(leveldb_iterators.begin(), leveldb_iterators.end()); | |
else | |
leveldb_iterators.pop_back(); | |
} | |
// write last batch | |
out->Write(w, &batch); | |
} | |
int main(int argc, char * argv[]) | |
{ | |
vector<string> in_paths; | |
string out_path; | |
po::options_description desc("options"); | |
desc.add_options() | |
("help,h", "produce help message") | |
("input,i", po::value< vector<string> >(&in_paths)->composing(), "leveldb input") | |
("output,o", po::value<string>(&out_path)->required(), "leveldb output") | |
; | |
po::variables_map vm; | |
try | |
{ | |
po::store(po::parse_command_line(argc, argv, desc), vm); | |
po::notify(vm); | |
} | |
catch(std::exception & e) | |
{ | |
cerr << e.what() << "\n"; | |
return 1; | |
} | |
if (vm.count("help")) | |
{ | |
cerr << "Usage: merge_ldb -i ldb1 -i ldb2 -o ldb_out" << endl; | |
cerr << desc; | |
return 0; | |
} | |
vector<leveldb_iterator> leveldb_iterators; | |
for (vector<string>::const_iterator it = in_paths.begin(); it != in_paths.end(); ++it) | |
leveldb_iterators.push_back(leveldb_iterator(*it)); | |
make_heap(leveldb_iterators.begin(), leveldb_iterators.end()); | |
leveldb::DB * out; | |
leveldb::Options options; | |
options.create_if_missing = true; | |
leveldb::Status s = leveldb::DB::Open(options, out_path, &out); | |
if (!s.ok()) | |
throw runtime_error(s.ToString()); | |
run(leveldb_iterators, out); | |
delete out; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment