-
-
Save hpcx82/4045503 to your computer and use it in GitHub Desktop.
Sort queries by their frequencies.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- this version is very slow, as every time we update the buffer, we creating a new string, which | |
-- makes the original buffer unreferenced. and lua's gc will kicks in often to clean up memory | |
-- which is really realy slow. | |
local file = io.open("file.txt", "w") | |
local buffer = '' | |
for i = 1, 500000000 do | |
local n = i % math.random(10000) | |
local str = string.format("This is a number %d\n", n) | |
buffer = buffer .. str -- make the original string buffer unreferenced | |
if i % 10000 == 0 then | |
file:write(buffer) | |
count = 0 | |
buffer = '' | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- updated version, use a table thus no gc involved | |
local file = io.open("file.txt", "w") | |
local t = {} | |
for i = 1, 500000000 do | |
local n = i % math.random(10000) | |
local str = string.format("This is a number %d\n", n) | |
table.insert(t, str) | |
if i % 10000 == 0 then | |
file:write(table.concat(t)) | |
t = {} | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// answer to http://weibo.com/1915548291/z2UtyzuvQ | |
#include <boost/bind.hpp> | |
#include <boost/function.hpp> | |
#include <boost/noncopyable.hpp> | |
#include <boost/ptr_container/ptr_vector.hpp> | |
#include <fstream> | |
#include <iostream> | |
#include <unordered_map> | |
#ifdef STD_STRING | |
#warning "STD STRING" | |
#include <string> | |
using std::string; | |
#else | |
#include <ext/vstring.h> | |
typedef __gnu_cxx::__sso_string string; | |
#endif | |
typedef boost::function<void(const string& query, int64_t count)> Callback; | |
const size_t kMaxSize = 10 * 1000 * 1000; | |
void processOneInput(const char* file, const Callback& cb) | |
{ | |
std::unordered_map<string, int64_t> queries; | |
std::ifstream in(file); | |
string query; | |
while (in && !in.eof()) | |
{ | |
queries.clear(); | |
while (getline(in, query)) | |
{ | |
queries[query] += 1; | |
if (queries.size() > kMaxSize) | |
{ | |
std::cout << " split\n"; | |
break; | |
} | |
} | |
for (auto kv : queries) | |
{ | |
cb(kv.first, kv.second); | |
} | |
} | |
} | |
class LocalSink : boost::noncopyable | |
{ | |
public: | |
explicit LocalSink(int idx, int nbuckets) | |
{ | |
char buf[256]; | |
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets); | |
out.open(buf); | |
} | |
void output(const string& query, int64_t count) | |
{ | |
out << query << '\t' << count << '\n'; | |
} | |
private: | |
std::ofstream out; | |
}; | |
// class RemoteSink; | |
// template<typename SINK> | |
class Shuffler : boost::noncopyable | |
{ | |
public: | |
explicit Shuffler(int nbuckets) | |
: buckets_(nbuckets) | |
{ | |
for (int i = 0; i < nbuckets; ++i) | |
{ | |
buckets_.push_back(new LocalSink(i, nbuckets)); | |
} | |
assert(buckets_.size() == static_cast<size_t>(nbuckets)); | |
} | |
void output(const string& query, int64_t count) | |
{ | |
int idx = std::hash<string>()(query) % buckets_.size(); | |
buckets_[idx].output(query, count); | |
} | |
protected: | |
boost::ptr_vector<LocalSink> buckets_; | |
}; | |
void shuffle(int nbuckets, int argc, char* argv[]) | |
{ | |
Shuffler shuffler(nbuckets); | |
for (int i = 1; i < argc; ++i) | |
{ | |
std::cout << " processing " << argv[i] << std::endl; | |
processOneInput(argv[i], | |
boost::bind(&Shuffler::output, &shuffler, _1, _2)); | |
} | |
std::cout << "shuffling done" << std::endl; | |
} | |
// ======= reduce ======= | |
std::unordered_map<string, int64_t> read_shard(int idx, int nbuckets) | |
{ | |
std::unordered_map<string, int64_t> queries; | |
char buf[256]; | |
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets); | |
std::cout << " reading " << buf << std::endl; | |
{ | |
std::ifstream in(buf); | |
string line; | |
while (getline(in, line)) | |
{ | |
size_t tab = line.find('\t'); | |
if (tab != string::npos) | |
{ | |
int64_t count = strtol(line.c_str() + tab, NULL, 10); | |
if (count > 0) | |
{ | |
queries[line.substr(0, tab)] += count; | |
} | |
} | |
} | |
} | |
::unlink(buf); | |
return queries; | |
} | |
void reduce(const int nbuckets) | |
{ | |
for (int i = 0; i < nbuckets; ++i) | |
{ | |
std::unordered_map<string, int64_t> queries(read_shard(i, nbuckets)); | |
// std::cout << " sorting " << std::endl; | |
std::vector<std::pair<int64_t, string>> counts; | |
for (auto entry : queries) | |
{ | |
counts.push_back(make_pair(entry.second, entry.first)); | |
} | |
std::sort(counts.begin(), counts.end()); | |
char buf[256]; | |
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets); | |
std::ofstream out(buf); | |
std::cout << " writing " << buf << std::endl; | |
for (auto it = counts.rbegin(); it != counts.rend(); ++it) | |
{ | |
out << it->first << '\t' << it->second << '\n'; | |
} | |
} | |
std::cout << "reducing done" << std::endl; | |
} | |
// ======= merge ======= | |
class Source | |
{ | |
public: | |
explicit Source(std::ifstream* in) | |
: in_(in), | |
count_(0), | |
query_() | |
{ | |
} | |
bool next() | |
{ | |
string line; | |
if (getline(*in_, line)) | |
{ | |
size_t tab = line.find('\t'); | |
if (tab != string::npos) | |
{ | |
count_ = strtol(line.c_str(), NULL, 10); | |
if (count_ > 0) | |
{ | |
query_ = line.substr(tab+1); | |
return true; | |
} | |
} | |
} | |
return false; | |
} | |
bool operator<(const Source& rhs) const | |
{ | |
return count_ < rhs.count_; | |
} | |
void output(std::ostream& out) | |
{ | |
out << count_ << '\t' << query_ << '\n'; | |
} | |
private: | |
std::ifstream* in_; | |
int64_t count_; | |
string query_; | |
}; | |
void merge(const int nbuckets) | |
{ | |
boost::ptr_vector<std::ifstream> inputs; | |
std::vector<Source> keys; | |
for (int i = 0; i < nbuckets; ++i) | |
{ | |
char buf[256]; | |
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets); | |
inputs.push_back(new std::ifstream(buf)); | |
Source rec(&inputs.back()); | |
if (rec.next()) | |
{ | |
keys.push_back(rec); | |
} | |
::unlink(buf); | |
} | |
std::ofstream out("output"); | |
std::make_heap(keys.begin(), keys.end()); | |
while (!keys.empty()) | |
{ | |
std::pop_heap(keys.begin(), keys.end()); | |
keys.back().output(out); | |
// out.writeRecord(keys.back().data); | |
if (keys.back().next()) | |
{ | |
std::push_heap(keys.begin(), keys.end()); | |
} | |
else | |
{ | |
keys.pop_back(); | |
} | |
} | |
std::cout << "merging done\n"; | |
} | |
int main(int argc, char* argv[]) | |
{ | |
int nbuckets = 10; | |
shuffle(nbuckets, argc, argv); | |
reduce(nbuckets); | |
merge(nbuckets); | |
} |
g++ sort.cpp -std=c++0x
sort file.txt
apt-get install libboost-dev
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
local file = io.open("file.txt", "w")
for i = 1, 500000000 do
local n = i % math.random(10000)
local str = string.format("This is a number %d", n)
file:write(str .. "\n")
end