Skip to content

Instantly share code, notes, and snippets.

@t-mat
Last active December 15, 2015 08:09
Show Gist options
  • Save t-mat/5229126 to your computer and use it in GitHub Desktop.
Save t-mat/5229126 to your computer and use it in GitHub Desktop.
lz4mt : OS independent, multithreading, block based lz4 compress/decompressor in C++11.
# check your g++ version
g++ --version
# --> g++ version 4.6.3 (or greater)
# compile
gcc -c -std=c99 lz4.c
gcc -c -std=c99 lz4hc.c
g++ -c -std=c++0x lz4mt.cpp
g++ -c -std=c++0x main.cpp
g++ -o lz4mt lz4.o lz4hc.o main.o -lpthread
# compress : multi thread
./lz4mt -c enwik8 enwik8.comp
# compress : single thread
./lz4mt -c enwik8 enwik8.comp -s
# decompress : multi thread
./lz4mt -d enwik8.comp enwik8.dec
# decompress : single thread
./lz4mt -d enwik8.comp enwik8.dec -s
#include <future>
#include <vector>
#include <memory.h>
#include "lz4mt.h"
namespace {
const size_t CHUNK_SIZE = 8 * 1024 * 1024;
const uint32_t ARCHIVE_MAGICNUMBER = 0x184c2102;
const char ARCHIVE_MAGIC[4] = {
(ARCHIVE_MAGICNUMBER >> 0) & 0xff
, (ARCHIVE_MAGICNUMBER >> 8) & 0xff
, (ARCHIVE_MAGICNUMBER >> 16) & 0xff
, (ARCHIVE_MAGICNUMBER >> 24) & 0xff
};
typedef std::vector<char> Buffer;
} // anonymous namespace
extern "C" Lz4MtResult
lz4mtCompress(
const Lz4MtParam* param
, Lz4MtCompressFunc compress
, Lz4MtCompressBoundFunc compressBound
, Lz4MtMode mode
) {
bool singleThread = 0 != (mode & LZ4MT_MODE_SEQUENTIAL);
param->write(
param->writeCtx
, ARCHIVE_MAGIC
, sizeof(ARCHIVE_MAGIC)
);
std::vector<std::future<void>> futures;
auto f = [&futures, param, compress, compressBound]
(int i, std::shared_ptr<Buffer> src, int srcSize)
{
Buffer dst(compressBound(srcSize) + 4);
auto compressSize = compress(
src->data()
, dst.data() + 4
, srcSize
);
src->clear();
auto* d = dst.data();
d[0] = static_cast<char>(compressSize);
d[1] = static_cast<char>(compressSize >> 8);
d[2] = static_cast<char>(compressSize >> 16);
d[3] = static_cast<char>(compressSize >> 24);
if(i > 0) {
futures[i-1].wait();
}
param->write(
param->writeCtx
, dst.data()
, compressSize + 4
);
};
for(int i = 0; !param->readEof(param->readCtx); ++i) {
std::shared_ptr<Buffer> src(new Buffer(CHUNK_SIZE));
auto srcSize = param->read(
param->readCtx
, src->data()
, static_cast<int>(src->size())
);
if(singleThread) {
f(0, std::move(src), srcSize);
} else {
futures.emplace_back(
std::async(
std::launch::async
, f
, i
, std::move(src)
, srcSize
)
);
}
}
for(auto& e : futures) {
e.wait();
}
return LZ4MT_RESULT_OK;
}
extern "C" Lz4MtResult
lz4mtDecompress(
const Lz4MtParam* param
, Lz4MtDecompressFunc decompress
, Lz4MtMode mode
) {
bool singleThread = 0 != (mode & LZ4MT_MODE_SEQUENTIAL);
char h[sizeof(ARCHIVE_MAGIC)] = { 0 };
auto sh = param->read(param->readCtx, h, sizeof(h));
if(sizeof(h) != sh) {
return LZ4MT_RESULT_INVALID_HEADER;
}
if(memcmp(ARCHIVE_MAGIC, h, sizeof(h))) {
return LZ4MT_RESULT_INVALID_HEADER;
}
std::vector<std::future<void>> futures;
auto f = [&futures, param, decompress]
(int i, std::shared_ptr<Buffer> src)
{
Buffer dst(CHUNK_SIZE);
auto decompressSize = decompress(
src->data()
, dst.data()
, static_cast<int>(src->size())
, static_cast<int>(dst.size())
);
src->clear();
src->shrink_to_fit();
if(i > 0) {
futures[i-1].wait();
}
param->write(
param->writeCtx
, dst.data()
, decompressSize
);
};
for(int i = 0; !param->readEof(param->readCtx); ++i) {
char t[4] = { 0 };
auto ts = param->read(
param->readCtx
, t
, sizeof(t)
);
if(sizeof(t) != ts) {
break;
}
auto srcSize =
(static_cast<size_t>(t[0] & 0xff) << 0)
| (static_cast<size_t>(t[1] & 0xff) << 8)
| (static_cast<size_t>(t[2] & 0xff) << 16)
| (static_cast<size_t>(t[3] & 0xff) << 24);
std::shared_ptr<Buffer> src(new Buffer(srcSize));
auto ss = param->read(
param->readCtx
, src->data()
, static_cast<int>(src->size())
);
if(static_cast<int>(src->size()) != ss) {
break;
}
if(singleThread) {
f(0, std::move(src));
} else {
futures.emplace_back(
std::async(
std::launch::async
, f
, i
, std::move(src)
)
);
}
}
for(auto& e : futures) {
e.wait();
}
return LZ4MT_RESULT_OK;
}
#ifndef LZ4MT_H
#define LZ4MT_H
#if defined(__cplusplus)
extern "C" {
#endif
enum Lz4MtResult {
LZ4MT_RESULT_OK = 0,
LZ4MT_RESULT_INVALID_HEADER,
};
typedef enum Lz4MtResult Lz4MtResult;
enum Lz4MtMode {
LZ4MT_MODE_DEFAULT = 0,
LZ4MT_MODE_PARALLEL = 0 << 0,
LZ4MT_MODE_SEQUENTIAL = 1 << 0,
};
typedef enum Lz4MtMode Lz4MtMode;
typedef int (*Lz4MtReadFunc)(
void* readContext
, void* dest
, int destSize
);
typedef int (*Lz4MtReadEofFunc)(
void* readContext
);
typedef int (*Lz4MtWriteFunc)(
void* writeContext
, const void* source
, int sourceSize
);
typedef int (*Lz4MtCompressFunc)(
const char* source
, char* dest
, int isize
);
typedef int (*Lz4MtCompressBoundFunc)(
int isize
);
typedef int (*Lz4MtDecompressFunc)(
const char* source
, char* dest
, int isize
, int maxOutputSize
);
struct Lz4MtParam {
void* readCtx;
Lz4MtReadFunc read;
Lz4MtReadEofFunc readEof;
void* writeCtx;
Lz4MtWriteFunc write;
};
typedef struct Lz4MtParam Lz4MtParam;
Lz4MtResult lz4mtCompress(
const Lz4MtParam* param
, Lz4MtCompressFunc compress
, Lz4MtCompressBoundFunc compressBound
, Lz4MtMode mode
);
Lz4MtResult lz4mtDecompress(
const Lz4MtParam* param
, Lz4MtDecompressFunc decompress
, Lz4MtMode mode
);
#if defined (__cplusplus)
}
#endif
#endif // LZ4MT_H
#include <stdio.h>
#include <string>
#include <iostream>
#include "lz4.h"
#include "lz4hc.h"
#include "lz4mt.h"
namespace {
void* openIstream(const std::string& filename) {
#if defined(_MSC_VER)
FILE* fp = nullptr;
::fopen_s(&fp, filename.c_str(), "rb");
#else
auto* fp = ::fopen(filename.c_str(), "rb");
#endif
return reinterpret_cast<void*>(fp);
}
void* openOstream(const std::string& filename) {
#if defined(_MSC_VER)
FILE* fp = nullptr;
::fopen_s(&fp, filename.c_str(), "wb");
#else
auto* fp = ::fopen(filename.c_str(), "wb");
#endif
return reinterpret_cast<void*>(fp);
}
void closeIstream(void* ctx) {
auto* fp = reinterpret_cast<FILE*>(ctx);
if(fp) {
::fclose(fp);
}
}
void closeOstream(void* ctx) {
auto* fp = reinterpret_cast<FILE*>(ctx);
if(fp) {
::fclose(fp);
}
}
int read(void* ctx, void* dest, int destSize) {
int r = 0;
auto* fp = reinterpret_cast<FILE*>(ctx);
if(fp) {
r = static_cast<int>(::fread(dest, 1, destSize, fp));
}
return r;
}
int eof(void* ctx) {
int r = 1;
auto* fp = reinterpret_cast<FILE*>(ctx);
if(fp) {
r = ::feof(fp);
}
return r;
}
int write(void* ctx, const void* source, int sourceSize) {
int r = 0;
auto* fp = reinterpret_cast<FILE*>(ctx);
if(fp) {
r = static_cast<int>(::fwrite(source, 1, sourceSize, fp));
}
return r;
}
} // anonymous namespace
int main(int argc, char* argv[]) {
enum class CompMode {
NONE
, DECOMPRESS
, COMPRESS_C0
, COMPRESS_C1
} compMode = CompMode::NONE;
int md = LZ4MT_MODE_DEFAULT;
std::string inpFilename;
std::string outFilename;
for(int iarg = 1; iarg < argc; ++iarg) {
const auto a = std::string(argv[iarg]);
if(a == "--help" || a == "-h") {
std::cerr <<
"usage :\n"
" lz4_mt [switch...] <input> <output>\n"
"switch :\n"
" -c : Compress (lz4) (default)\n"
" -c1 : Compress (lz4hc)\n"
" -d : Decompress\n"
" -s : Single thread mode\n"
" -m : Multi thread mode (default)\n"
" -h : help\n";
exit(EXIT_FAILURE);
} else if(a == "-s") {
md |= LZ4MT_MODE_SEQUENTIAL;
} else if(a == "-m") {
md &= ~LZ4MT_MODE_SEQUENTIAL;
} else if(a == "-d") {
compMode = CompMode::DECOMPRESS;
} else if(a == "-c" || a == "-c0") {
compMode = CompMode::COMPRESS_C0;
} else if(a == "-c1") {
compMode = CompMode::COMPRESS_C1;
} else if(a[0] == '-') {
std::cerr << "ERROR: bad switch [" << a << "]\n";
exit(EXIT_FAILURE);
} else if(inpFilename.empty()) {
inpFilename = a;
} else if(outFilename.empty()) {
outFilename = a;
} else {
std::cerr << "ERROR: Bad argument [" << a << "]\n";
exit(EXIT_FAILURE);
}
}
if(CompMode::NONE == compMode) {
std::cerr << "ERROR: You must specify a switch -c or -d\n";
exit(EXIT_FAILURE);
}
if(inpFilename.empty()) {
std::cerr << "ERROR: No input filename\n";
exit(EXIT_FAILURE);
}
if(outFilename.empty()) {
std::cerr << "ERROR: No output filename\n";
exit(EXIT_FAILURE);
}
auto* is = openIstream(inpFilename);
if(!is) {
std::cerr << "ERROR: Can't open input file ["
<< inpFilename << "]\n";
exit(EXIT_FAILURE);
}
auto* os = openOstream(outFilename);
if(!os) {
std::cerr << "ERROR: Can't open output file ["
<< outFilename << "]\n";
exit(EXIT_FAILURE);
}
Lz4MtParam param = { 0 };
param.readCtx = is;
param.read = read;
param.readEof = eof;
param.writeCtx = os;
param.write = write;
auto mode = static_cast<Lz4MtMode>(md);
auto e = LZ4MT_RESULT_OK;
switch(compMode) {
default:
break;
case CompMode::DECOMPRESS:
e = lz4mtDecompress(
&param
, LZ4_uncompress_unknownOutputSize
, mode
);
break;
case CompMode::COMPRESS_C0:
e = lz4mtCompress(
&param
, LZ4_compress
, LZ4_compressBound
, mode
);
break;
case CompMode::COMPRESS_C1:
e = lz4mtCompress(
&param
, LZ4_compressHC
, LZ4_compressBound
, mode
);
break;
}
closeOstream(os);
closeIstream(is);
switch(e) {
case LZ4MT_RESULT_OK:
break;
case LZ4MT_RESULT_INVALID_HEADER:
std::cerr << "ERROR: Invalid header\n";
exit(EXIT_FAILURE);
break;
default:
std::cerr << "ERROR: ExitCode = "
<< static_cast<int>(e) << "\n";
exit(EXIT_FAILURE);
break;
}
exit(EXIT_SUCCESS);
}
@damian123
Copy link

Thank you for a nice piece of work!
When I turn on /arch:AVX in VS2012 it crashes. SSE2 works but not sure if I see any speed up by doing that.

@damian123
Copy link

Did you try a 64 bit version of this and saw any speed improvements?

@t-mat
Copy link
Author

t-mat commented Apr 4, 2013

hi, thank you for testing.

When I turn on /arch:AVX in VS2012 it crashes.

Because I have no AVX enabled machine, I can't check /arc:AVX for now :(
Could you provide error messages and stack trace ?

Did you try a 64 bit version of this

Yes, I've checked both x86(Win32)/x64 platform.

and saw any speed improvements?

I did not investigate this problem, but in my experience, it depends.

Yes, basically, x64 is slower than x86.
But for small and sparse file (eg. contains a lot of 0x00s), sometimes x64 is faster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment