Last active
December 15, 2015 08:09
-
-
Save t-mat/5229126 to your computer and use it in GitHub Desktop.
lz4mt : OS independent, multithreading, block based lz4 compress/decompressor in C++11.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# check your g++ version | |
g++ --version | |
# --> g++ version 4.6.3 (or greater) | |
# compile | |
gcc -c -std=c99 lz4.c | |
gcc -c -std=c99 lz4hc.c | |
g++ -c -std=c++0x lz4mt.cpp | |
g++ -c -std=c++0x main.cpp | |
g++ -o lz4mt lz4.o lz4hc.o main.o -lpthread | |
# compress : multi thread | |
./lz4mt -c enwik8 enwik8.comp | |
# compress : single thread | |
./lz4mt -c enwik8 enwik8.comp -s | |
# decompress : multi thread | |
./lz4mt -d enwik8.comp enwik8.dec | |
# decompress : single thread | |
./lz4mt -d enwik8.comp enwik8.dec -s |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <future> | |
#include <vector> | |
#include <memory.h> | |
#include "lz4mt.h" | |
namespace { | |
const size_t CHUNK_SIZE = 8 * 1024 * 1024; | |
const uint32_t ARCHIVE_MAGICNUMBER = 0x184c2102; | |
const char ARCHIVE_MAGIC[4] = { | |
(ARCHIVE_MAGICNUMBER >> 0) & 0xff | |
, (ARCHIVE_MAGICNUMBER >> 8) & 0xff | |
, (ARCHIVE_MAGICNUMBER >> 16) & 0xff | |
, (ARCHIVE_MAGICNUMBER >> 24) & 0xff | |
}; | |
typedef std::vector<char> Buffer; | |
} // anonymous namespace | |
extern "C" Lz4MtResult | |
lz4mtCompress( | |
const Lz4MtParam* param | |
, Lz4MtCompressFunc compress | |
, Lz4MtCompressBoundFunc compressBound | |
, Lz4MtMode mode | |
) { | |
bool singleThread = 0 != (mode & LZ4MT_MODE_SEQUENTIAL); | |
param->write( | |
param->writeCtx | |
, ARCHIVE_MAGIC | |
, sizeof(ARCHIVE_MAGIC) | |
); | |
std::vector<std::future<void>> futures; | |
auto f = [&futures, param, compress, compressBound] | |
(int i, std::shared_ptr<Buffer> src, int srcSize) | |
{ | |
Buffer dst(compressBound(srcSize) + 4); | |
auto compressSize = compress( | |
src->data() | |
, dst.data() + 4 | |
, srcSize | |
); | |
src->clear(); | |
auto* d = dst.data(); | |
d[0] = static_cast<char>(compressSize); | |
d[1] = static_cast<char>(compressSize >> 8); | |
d[2] = static_cast<char>(compressSize >> 16); | |
d[3] = static_cast<char>(compressSize >> 24); | |
if(i > 0) { | |
futures[i-1].wait(); | |
} | |
param->write( | |
param->writeCtx | |
, dst.data() | |
, compressSize + 4 | |
); | |
}; | |
for(int i = 0; !param->readEof(param->readCtx); ++i) { | |
std::shared_ptr<Buffer> src(new Buffer(CHUNK_SIZE)); | |
auto srcSize = param->read( | |
param->readCtx | |
, src->data() | |
, static_cast<int>(src->size()) | |
); | |
if(singleThread) { | |
f(0, std::move(src), srcSize); | |
} else { | |
futures.emplace_back( | |
std::async( | |
std::launch::async | |
, f | |
, i | |
, std::move(src) | |
, srcSize | |
) | |
); | |
} | |
} | |
for(auto& e : futures) { | |
e.wait(); | |
} | |
return LZ4MT_RESULT_OK; | |
} | |
extern "C" Lz4MtResult | |
lz4mtDecompress( | |
const Lz4MtParam* param | |
, Lz4MtDecompressFunc decompress | |
, Lz4MtMode mode | |
) { | |
bool singleThread = 0 != (mode & LZ4MT_MODE_SEQUENTIAL); | |
char h[sizeof(ARCHIVE_MAGIC)] = { 0 }; | |
auto sh = param->read(param->readCtx, h, sizeof(h)); | |
if(sizeof(h) != sh) { | |
return LZ4MT_RESULT_INVALID_HEADER; | |
} | |
if(memcmp(ARCHIVE_MAGIC, h, sizeof(h))) { | |
return LZ4MT_RESULT_INVALID_HEADER; | |
} | |
std::vector<std::future<void>> futures; | |
auto f = [&futures, param, decompress] | |
(int i, std::shared_ptr<Buffer> src) | |
{ | |
Buffer dst(CHUNK_SIZE); | |
auto decompressSize = decompress( | |
src->data() | |
, dst.data() | |
, static_cast<int>(src->size()) | |
, static_cast<int>(dst.size()) | |
); | |
src->clear(); | |
src->shrink_to_fit(); | |
if(i > 0) { | |
futures[i-1].wait(); | |
} | |
param->write( | |
param->writeCtx | |
, dst.data() | |
, decompressSize | |
); | |
}; | |
for(int i = 0; !param->readEof(param->readCtx); ++i) { | |
char t[4] = { 0 }; | |
auto ts = param->read( | |
param->readCtx | |
, t | |
, sizeof(t) | |
); | |
if(sizeof(t) != ts) { | |
break; | |
} | |
auto srcSize = | |
(static_cast<size_t>(t[0] & 0xff) << 0) | |
| (static_cast<size_t>(t[1] & 0xff) << 8) | |
| (static_cast<size_t>(t[2] & 0xff) << 16) | |
| (static_cast<size_t>(t[3] & 0xff) << 24); | |
std::shared_ptr<Buffer> src(new Buffer(srcSize)); | |
auto ss = param->read( | |
param->readCtx | |
, src->data() | |
, static_cast<int>(src->size()) | |
); | |
if(static_cast<int>(src->size()) != ss) { | |
break; | |
} | |
if(singleThread) { | |
f(0, std::move(src)); | |
} else { | |
futures.emplace_back( | |
std::async( | |
std::launch::async | |
, f | |
, i | |
, std::move(src) | |
) | |
); | |
} | |
} | |
for(auto& e : futures) { | |
e.wait(); | |
} | |
return LZ4MT_RESULT_OK; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef LZ4MT_H | |
#define LZ4MT_H | |
#if defined(__cplusplus) | |
extern "C" { | |
#endif | |
enum Lz4MtResult { | |
LZ4MT_RESULT_OK = 0, | |
LZ4MT_RESULT_INVALID_HEADER, | |
}; | |
typedef enum Lz4MtResult Lz4MtResult; | |
enum Lz4MtMode { | |
LZ4MT_MODE_DEFAULT = 0, | |
LZ4MT_MODE_PARALLEL = 0 << 0, | |
LZ4MT_MODE_SEQUENTIAL = 1 << 0, | |
}; | |
typedef enum Lz4MtMode Lz4MtMode; | |
typedef int (*Lz4MtReadFunc)( | |
void* readContext | |
, void* dest | |
, int destSize | |
); | |
typedef int (*Lz4MtReadEofFunc)( | |
void* readContext | |
); | |
typedef int (*Lz4MtWriteFunc)( | |
void* writeContext | |
, const void* source | |
, int sourceSize | |
); | |
typedef int (*Lz4MtCompressFunc)( | |
const char* source | |
, char* dest | |
, int isize | |
); | |
typedef int (*Lz4MtCompressBoundFunc)( | |
int isize | |
); | |
typedef int (*Lz4MtDecompressFunc)( | |
const char* source | |
, char* dest | |
, int isize | |
, int maxOutputSize | |
); | |
struct Lz4MtParam { | |
void* readCtx; | |
Lz4MtReadFunc read; | |
Lz4MtReadEofFunc readEof; | |
void* writeCtx; | |
Lz4MtWriteFunc write; | |
}; | |
typedef struct Lz4MtParam Lz4MtParam; | |
Lz4MtResult lz4mtCompress( | |
const Lz4MtParam* param | |
, Lz4MtCompressFunc compress | |
, Lz4MtCompressBoundFunc compressBound | |
, Lz4MtMode mode | |
); | |
Lz4MtResult lz4mtDecompress( | |
const Lz4MtParam* param | |
, Lz4MtDecompressFunc decompress | |
, Lz4MtMode mode | |
); | |
#if defined (__cplusplus) | |
} | |
#endif | |
#endif // LZ4MT_H |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <string> | |
#include <iostream> | |
#include "lz4.h" | |
#include "lz4hc.h" | |
#include "lz4mt.h" | |
namespace { | |
void* openIstream(const std::string& filename) { | |
#if defined(_MSC_VER) | |
FILE* fp = nullptr; | |
::fopen_s(&fp, filename.c_str(), "rb"); | |
#else | |
auto* fp = ::fopen(filename.c_str(), "rb"); | |
#endif | |
return reinterpret_cast<void*>(fp); | |
} | |
void* openOstream(const std::string& filename) { | |
#if defined(_MSC_VER) | |
FILE* fp = nullptr; | |
::fopen_s(&fp, filename.c_str(), "wb"); | |
#else | |
auto* fp = ::fopen(filename.c_str(), "wb"); | |
#endif | |
return reinterpret_cast<void*>(fp); | |
} | |
void closeIstream(void* ctx) { | |
auto* fp = reinterpret_cast<FILE*>(ctx); | |
if(fp) { | |
::fclose(fp); | |
} | |
} | |
void closeOstream(void* ctx) { | |
auto* fp = reinterpret_cast<FILE*>(ctx); | |
if(fp) { | |
::fclose(fp); | |
} | |
} | |
int read(void* ctx, void* dest, int destSize) { | |
int r = 0; | |
auto* fp = reinterpret_cast<FILE*>(ctx); | |
if(fp) { | |
r = static_cast<int>(::fread(dest, 1, destSize, fp)); | |
} | |
return r; | |
} | |
int eof(void* ctx) { | |
int r = 1; | |
auto* fp = reinterpret_cast<FILE*>(ctx); | |
if(fp) { | |
r = ::feof(fp); | |
} | |
return r; | |
} | |
int write(void* ctx, const void* source, int sourceSize) { | |
int r = 0; | |
auto* fp = reinterpret_cast<FILE*>(ctx); | |
if(fp) { | |
r = static_cast<int>(::fwrite(source, 1, sourceSize, fp)); | |
} | |
return r; | |
} | |
} // anonymous namespace | |
int main(int argc, char* argv[]) { | |
enum class CompMode { | |
NONE | |
, DECOMPRESS | |
, COMPRESS_C0 | |
, COMPRESS_C1 | |
} compMode = CompMode::NONE; | |
int md = LZ4MT_MODE_DEFAULT; | |
std::string inpFilename; | |
std::string outFilename; | |
for(int iarg = 1; iarg < argc; ++iarg) { | |
const auto a = std::string(argv[iarg]); | |
if(a == "--help" || a == "-h") { | |
std::cerr << | |
"usage :\n" | |
" lz4_mt [switch...] <input> <output>\n" | |
"switch :\n" | |
" -c : Compress (lz4) (default)\n" | |
" -c1 : Compress (lz4hc)\n" | |
" -d : Decompress\n" | |
" -s : Single thread mode\n" | |
" -m : Multi thread mode (default)\n" | |
" -h : help\n"; | |
exit(EXIT_FAILURE); | |
} else if(a == "-s") { | |
md |= LZ4MT_MODE_SEQUENTIAL; | |
} else if(a == "-m") { | |
md &= ~LZ4MT_MODE_SEQUENTIAL; | |
} else if(a == "-d") { | |
compMode = CompMode::DECOMPRESS; | |
} else if(a == "-c" || a == "-c0") { | |
compMode = CompMode::COMPRESS_C0; | |
} else if(a == "-c1") { | |
compMode = CompMode::COMPRESS_C1; | |
} else if(a[0] == '-') { | |
std::cerr << "ERROR: bad switch [" << a << "]\n"; | |
exit(EXIT_FAILURE); | |
} else if(inpFilename.empty()) { | |
inpFilename = a; | |
} else if(outFilename.empty()) { | |
outFilename = a; | |
} else { | |
std::cerr << "ERROR: Bad argument [" << a << "]\n"; | |
exit(EXIT_FAILURE); | |
} | |
} | |
if(CompMode::NONE == compMode) { | |
std::cerr << "ERROR: You must specify a switch -c or -d\n"; | |
exit(EXIT_FAILURE); | |
} | |
if(inpFilename.empty()) { | |
std::cerr << "ERROR: No input filename\n"; | |
exit(EXIT_FAILURE); | |
} | |
if(outFilename.empty()) { | |
std::cerr << "ERROR: No output filename\n"; | |
exit(EXIT_FAILURE); | |
} | |
auto* is = openIstream(inpFilename); | |
if(!is) { | |
std::cerr << "ERROR: Can't open input file [" | |
<< inpFilename << "]\n"; | |
exit(EXIT_FAILURE); | |
} | |
auto* os = openOstream(outFilename); | |
if(!os) { | |
std::cerr << "ERROR: Can't open output file [" | |
<< outFilename << "]\n"; | |
exit(EXIT_FAILURE); | |
} | |
Lz4MtParam param = { 0 }; | |
param.readCtx = is; | |
param.read = read; | |
param.readEof = eof; | |
param.writeCtx = os; | |
param.write = write; | |
auto mode = static_cast<Lz4MtMode>(md); | |
auto e = LZ4MT_RESULT_OK; | |
switch(compMode) { | |
default: | |
break; | |
case CompMode::DECOMPRESS: | |
e = lz4mtDecompress( | |
¶m | |
, LZ4_uncompress_unknownOutputSize | |
, mode | |
); | |
break; | |
case CompMode::COMPRESS_C0: | |
e = lz4mtCompress( | |
¶m | |
, LZ4_compress | |
, LZ4_compressBound | |
, mode | |
); | |
break; | |
case CompMode::COMPRESS_C1: | |
e = lz4mtCompress( | |
¶m | |
, LZ4_compressHC | |
, LZ4_compressBound | |
, mode | |
); | |
break; | |
} | |
closeOstream(os); | |
closeIstream(is); | |
switch(e) { | |
case LZ4MT_RESULT_OK: | |
break; | |
case LZ4MT_RESULT_INVALID_HEADER: | |
std::cerr << "ERROR: Invalid header\n"; | |
exit(EXIT_FAILURE); | |
break; | |
default: | |
std::cerr << "ERROR: ExitCode = " | |
<< static_cast<int>(e) << "\n"; | |
exit(EXIT_FAILURE); | |
break; | |
} | |
exit(EXIT_SUCCESS); | |
} |
Did you try a 64 bit version of this and saw any speed improvements?
hi, thank you for testing.
When I turn on /arch:AVX in VS2012 it crashes.
Because I have no AVX enabled machine, I can't check /arc:AVX
for now :(
Could you provide error messages and stack trace ?
Did you try a 64 bit version of this
Yes, I've checked both x86(Win32)/x64 platform.
and saw any speed improvements?
I did not investigate this problem, but in my experience, it depends.
Yes, basically, x64 is slower than x86.
But for small and sparse file (eg. contains a lot of 0x00
s), sometimes x64 is faster.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you for a nice piece of work!
When I turn on /arch:AVX in VS2012 it crashes. SSE2 works but not sure if I see any speed up by doing that.