Skip to content

Instantly share code, notes, and snippets.

@markpapadakis
Created May 9, 2014 19:07
Show Gist options
  • Select an option

  • Save markpapadakis/26e5d598632da1f1f108 to your computer and use it in GitHub Desktop.

Select an option

Save markpapadakis/26e5d598632da1f1f108 to your computer and use it in GitHub Desktop.
class BufFileReader
: public ISerializer
{
private:
int fd;
uint64_t chunkOffset;
uint64_t chunkSize;
uint64_t chunkEnd;
uint64_t offset;
struct
{
uint8_t *data;
uint32_t size;
uint32_t len;
uint16_t alignment;
_ALWAYS_inline_ void *DataWithOffset(const uint32_t o)
{
return data + o;
}
void Free(void)
{
if (data)
{
::free(data);
data = NULL;
size = 0;
}
len = 0;
}
_ALWAYS_inline_ void SetLength(const uint32_t l)
{
len = l;
}
void InsertSpace(const uint32_t offset, const uint32_t l)
{
runtime_assert(offset + len + l <= size, "Unexpected");
len+=l;
const auto dest = offset + l;
(void)memmove(data + dest, data + offset, len - dest);
}
_ALWAYS_inline_ void *Data(void)
{
return data;
}
_ALWAYS_inline_ void SetLengthAndTerm(const uint32_t l)
{
len = l;
}
_ALWAYS_inline_ void SetZeroLength(void)
{
len = 0;
}
void EnsureSize(const uint32_t s, const bool retain = true)
{
if (likely(s <= size))
return;
if (unlikely(alignment))
{
if (size == 0)
{
if (data)
::free(data);
if (unlikely(posix_memalign((void **)&data, alignment, s)))
{
IMPLERROUT("posix_memalign() failed\n");
Abort();
}
size = s;
}
else
{
void *n;
if (unlikely(posix_memalign(&n, alignment, s)))
{
IMPLERROUT("posix_memalign() failed\n");
Abort();
}
if (retain)
(void)memcpy(n, data, len);
else
len = 0;
::free(data);
data = (uint8_t *)n;
size = s;
}
}
else
{
if (retain)
{
data = (uint8_t *)realloc(data, s);
}
else
{
if (data)
::free(data);
data= (uint8_t *)malloc(s);
len = 0;
}
runtime_assert(data, "realloc() failed");
}
size = s;
}
} buf;
uint64_t fileSize;
uint32_t blockSize, capacityThreshold;
void (*cb)(const uint64_t, const uint32_t size, void *);
void *cbCtx;
uint32_t alignment;
private:
void EnsureAccessImpl(const uint32_t size);
_ALWAYS_inline_ void EnsureAccess(const uint32_t size)
{
EnsureAccessImpl(size);
}
// For unserializing varints, where we don't really know how much length a varint encoded repr.
// we are going to make certain that at least 1 byte can be accessed
_ALWAYS_inline_ void EnsureAccessUpto(const uint32_t size)
{
EnsureAccess(size);
}
public:
_ALWAYS_inline_ void *Data(void)
{
return buf.DataWithOffset(offset - chunkOffset);
}
_ALWAYS_inline_ uint64_t GetFileSize(void) const
{
return fileSize;
}
_ALWAYS_inline_ uint64_t GetOffset(void) const
{
return offset;
}
BufFileReader(int fd, const uint64_t fSize)
{
this->fd = fd;
buf.data = NULL;
buf.size = 0;
buf.alignment = 0;
buf.len = 0;
chunkOffset = 0;
chunkSize = 0;
chunkEnd = 0;
offset = 0;
fileSize = fSize;
blockSize = 8192;
capacityThreshold = 65536;
cb = NULL;
alignment = 0;
}
BufFileReader(void)
{
buf.data = NULL;
buf.size = 0;
buf.alignment = 0;
buf.len = 0;
fd = -1;
chunkOffset = 0;
chunkSize = 0;
chunkEnd = 0;
offset = 0;
fileSize = 0;
blockSize = 8192;
capacityThreshold = 65536;
cb = NULL;
// Use Reset() to prepare
alignment = 0;
}
~BufFileReader(void)
{
buf.Free();
}
void SetAlignment(const uint32_t a)
{
// ## O_DIRECT access
// O_DIRECT flag may impose alignment restrictions on the length and address of user-space buffers and the file offset of I/Os.
// In Linux alignment restrictions vary by filesystem and kernel version and might be absent entirely.
// However there is currently no FS-indepedent interface for an application to discover these restrictions for a given file or FS.
// Some filesystems provide their own interfaces for doing so, e.g XFS_IOC_DIOINFO operation in xfsctl(3)
//
// Under Linux 2.4, transfer sizes and the alignment of the user buffer and the file offset must all be multipled of the logical block size
// of the filesystem.
//
// Under Linux 2.6, alignment to 512-byte boundaries suffices.
// There is a LOT more to it, please consult the man page
//
// The gist of it is, you need to use SetAlignment() and SetMemAlignment() to a multiple of 512 under 2.6+
// alignment affects block aligment too. You, also, probably need to avoid it anyway
// http://unix.stackexchange.com/questions/6467/use-of-o-direct-on-linux
//
runtime_assert(IsPowerOfTwo(a), "Not power of two alignment");
runtime_assert((blockSize%a) == 0, "alignment must be a multiple of block size");
alignment = a;
}
void SetMemAlignment(const uint64_t a)
{
runtime_assert(IsPowerOfTwo(a), "Not power of two alignment");
runtime_assert(buf.data == NULL, "Already allocated");
buf.alignment = a;
}
void SetCallback(void (*proc)(const uint64_t, const uint32_t, void *), void *procCtx)
{
cb = proc;
cbCtx = procCtx;
}
void Reset(int fdescriptor, const uint64_t fs)
{
fd = fdescriptor;
fileSize= fs;
chunkOffset = 0;
chunkSize = 0;
chunkEnd = 0;
offset = 0;
blockSize = 8192;
capacityThreshold= 65536;
if (buf.size > 64 * 1024 * 1024)
buf.Free();
alignment = 0;
buf.alignment = 0;
buf.len = 0;
}
void ResetChunk(void)
{
chunkOffset = 0;
chunkSize = 0;
chunkEnd = 0;
offset = 0;
}
void Reset(int fdescriptor, const uint64_t fs, const uint64_t newOffset)
{
Reset(fdescriptor, fs);
SetOffset(newOffset);
}
void SetBlockSize(const uint32_t s)
{
blockSize = ALIGNUP_TO_POWEROF2BOUNDARY(s, 512);
if (blockSize > capacityThreshold)
capacityThreshold = ALIGNUP_TO_POWEROF2BOUNDARY(blockSize, 65536);
if (alignment)
{
runtime_assert((blockSize%alignment) == 0, "alignment must be a multiple of block size");
}
}
uint32_t BlockSize(void) const
{
return blockSize;
}
uint32_t CapacityThreshold(void) const
{
return capacityThreshold;
}
void SetCapacityThreshold(const uint32_t s)
{
capacityThreshold = ALIGNUP_TO_POWEROF2BOUNDARY(s, 8192);
}
_ALWAYS_inline_ void SeekTo(const uint64_t o)
{
offset = o;
}
void ResetBufferWithThreshold(const uint32_t n)
{
if (unlikely(buf.size > n))
buf.Free();
}
void Serialize(const void *p, const uint32_t size)
{
ASSERT_NOT_REACHED();
}
void Serialize(const uint32_t v)
{
ASSERT_NOT_REACHED();
}
void Serialize(const double v)
{
ASSERT_NOT_REACHED();
}
void Serialize(const uint8_t v)
{
ASSERT_NOT_REACHED();
}
void SerializePackedUInt32WithLenPrefix(const uint32_t n)
{
ASSERT_NOT_REACHED();
}
void SerializeVarUInt32(const uint32_t n)
{
ASSERT_NOT_REACHED();
}
_ALWAYS_inline_ void Unserialize(void *const p, const uint32_t size)
{
EnsureAccess(size);
(void)memcpy(p, Data(), size);
offset+=size;
}
void Unserialize(IOBuffer *const out, const uint32_t size)
{
EnsureAccess(size);
out->Serialize(Data(), size);
offset+=size;
}
template<typename T>
inline T Unserialize(void)
{
EnsureAccess(sizeof(T));
const T r = *(T *)Data();
offset+=sizeof(T);
return r;
}
template<typename T>
_ALWAYS_inline_ void Unserialize(T *dst)
{
Unserialize(dst, sizeof(T));
}
inline uint32_t UnserializePackedUInt32WithLenPrefix(void)
{
uint8_t l;
EnsureAccessUpto(16);
const uint32_t r = Compression::UnpackUInt32WithLenPrefix((uint8_t *)Data(), l);
runtime_assert(l < 16, "Unexpected Compression::UnpackUInt32WithLenPrefix() length displ");
offset+=l;
return r;
}
inline uint32_t UnserializeVarUInt32(void)
{
EnsureAccessUpto(5);
const uint8_t *b = (uint8_t *)Data(), *const base = b;
const uint32_t r = Compression::UnpackUInt32(b);
offset+=b - base;
return r;
}
void *SeekAndRead(const uint64_t o, const uint32_t s)
{
SeekTo(o);
EnsureAccess(s);
void *const p = Data();
offset+=s;
return p;
}
_ALWAYS_inline_ void *DataAt(const uint64_t o, const uint32_t s)
{
return SeekAndRead(o, s);
}
_ALWAYS_inline_ void *Peek(const uint32_t size)
{
EnsureAccess(size);
return Data();
}
_ALWAYS_inline_ void AdvanceOffset(const uint64_t s)
{
offset+=s;
}
_ALWAYS_inline_ void SetOffset(const uint64_t o)
{
offset = o;
}
void *operator new(const size_t size, IMemoryAllocator *allocator)
{
return allocator->Alloc(size);
}
void *operator new(const size_t size)
{
return ::operator new(size);
}
_ALWAYS_inline_ bool IsAtEnd(void) const
{
return offset == fileSize;
}
};
void BufFileReader::EnsureAccessImpl(const uint32_t size)
{
if (unlikely(offset < chunkOffset))
{
const uint64_t end = offset + size;
if (end >= chunkOffset)
{
// Can we just extend to the left?
// We want to try to read >= blockSize, which better than reading < blockSize
uint64_t leftOffset = unlikely(alignment) ? ALIGNDOWN_TO_POWEROF2BOUNDARY(offset, alignment) : offset;
uint64_t extra = chunkOffset - leftOffset;
const uint64_t computed = extra + chunkSize;
if (unlikely(computed > capacityThreshold))
goto resetChunk;
if (end > chunkEnd)
goto resetChunk;
const uint32_t rounded = ALIGNUP_TO_POWEROF2BOUNDARY(extra, blockSize);
if (const uint32_t d = rounded - extra)
{
if (leftOffset < blockSize)
{
if (computed + leftOffset < capacityThreshold)
{
leftOffset = 0;
extra = chunkOffset;
}
}
else if (rounded < capacityThreshold)
{
leftOffset -= d;
extra = rounded;
}
}
buf.EnsureSize(chunkSize + extra);
buf.SetLength(chunkSize);
buf.InsertSpace(0, extra);
// Due to alignment, we can certain read INTO existing buffer data, but that's OK
(void)_pread64(fd, buf.Data(), unlikely(alignment) ? ALIGNUP_TO_POWEROF2BOUNDARY(extra, alignment) : extra, leftOffset, cb, cbCtx);
chunkSize += extra;
chunkOffset = leftOffset;
buf.SetLengthAndTerm(chunkSize);
return;
}
}
else if (likely(offset < chunkEnd))
{
const uint64_t end = offset + size;
if (likely(end <= chunkEnd))
return;
// Can we extend to the right?
uint64_t rightOffset = unlikely(alignment) ? ALIGNUP_TO_POWEROF2BOUNDARY(offset + size, alignment) : offset + size;
uint64_t extra = rightOffset - chunkEnd;
const uint64_t computed = extra + chunkSize;
if (computed > capacityThreshold)
goto resetChunk;
if (extra < blockSize)
{
const uint64_t n = Min<uint64_t>(fileSize, ALIGNUP_TO_POWEROF2BOUNDARY(rightOffset, blockSize));
const uint64_t rem = n - rightOffset;
if (extra + rem < capacityThreshold)
{
extra+=rem;
rightOffset+=rem;
}
}
buf.EnsureSize(chunkSize + extra);
(void)_pread64(fd, buf.DataWithOffset(chunkSize), extra, chunkEnd, cb, cbCtx);
chunkSize += extra;
chunkEnd = chunkOffset + chunkSize;
buf.SetLengthAndTerm(chunkSize);
return;
}
resetChunk:
range64_t res;
if (unlikely(alignment))
{
// Block size is aligned already
const uint64_t base = ALIGNDOWN_TO_POWEROF2BOUNDARY(offset, alignment), end = Min<uint64_t>(fileSize, base + ALIGNUP_TO_POWEROF2BOUNDARY(size, blockSize));
const uint64_t upto = Min<uint64_t>(fileSize, offset + size);
if (upto <= end)
res.Set(base, end - base);
else
res.Set(base, ALIGNUP_TO_POWEROF2BOUNDARY(upto, blockSize) - base);
}
else
{
const uint64_t end = Min<uint64_t>(fileSize, offset + ALIGNUP_TO_POWEROF2BOUNDARY(size, blockSize));
res.Set(offset, end - offset);
}
if (likely(alignment == 0)) // TODO: support this for aligned reads too
{
const auto toNextBlock = ALIGNUP_TO_POWEROF2BOUNDARY(res.len, blockSize);
if (const auto d = toNextBlock - res.len)
{
if (toNextBlock < capacityThreshold)
{
res.len = toNextBlock;
const auto end = res.End();
if (end > fileSize)
{
const auto d = end - fileSize;
int64_t left = res.offset - d;
if (left < 0)
{
res.len += left;
res.offset = 0;
}
else
{
res.offset = left;
}
}
}
}
}
chunkOffset = res.offset;
chunkSize = res.len;
chunkEnd = res.End();
buf.SetZeroLength();
buf.EnsureSize(chunkSize, false);
(void)_pread64(fd, buf.Data(), chunkSize, chunkOffset, cb, cbCtx);
buf.SetLengthAndTerm(chunkSize);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment