Created
May 9, 2014 19:07
-
-
Save markpapadakis/26e5d598632da1f1f108 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class BufFileReader | |
| : public ISerializer | |
| { | |
| private: | |
| int fd; | |
| uint64_t chunkOffset; | |
| uint64_t chunkSize; | |
| uint64_t chunkEnd; | |
| uint64_t offset; | |
| struct | |
| { | |
| uint8_t *data; | |
| uint32_t size; | |
| uint32_t len; | |
| uint16_t alignment; | |
| _ALWAYS_inline_ void *DataWithOffset(const uint32_t o) | |
| { | |
| return data + o; | |
| } | |
| void Free(void) | |
| { | |
| if (data) | |
| { | |
| ::free(data); | |
| data = NULL; | |
| size = 0; | |
| } | |
| len = 0; | |
| } | |
| _ALWAYS_inline_ void SetLength(const uint32_t l) | |
| { | |
| len = l; | |
| } | |
| void InsertSpace(const uint32_t offset, const uint32_t l) | |
| { | |
| runtime_assert(offset + len + l <= size, "Unexpected"); | |
| len+=l; | |
| const auto dest = offset + l; | |
| (void)memmove(data + dest, data + offset, len - dest); | |
| } | |
| _ALWAYS_inline_ void *Data(void) | |
| { | |
| return data; | |
| } | |
| _ALWAYS_inline_ void SetLengthAndTerm(const uint32_t l) | |
| { | |
| len = l; | |
| } | |
| _ALWAYS_inline_ void SetZeroLength(void) | |
| { | |
| len = 0; | |
| } | |
| void EnsureSize(const uint32_t s, const bool retain = true) | |
| { | |
| if (likely(s <= size)) | |
| return; | |
| if (unlikely(alignment)) | |
| { | |
| if (size == 0) | |
| { | |
| if (data) | |
| ::free(data); | |
| if (unlikely(posix_memalign((void **)&data, alignment, s))) | |
| { | |
| IMPLERROUT("posix_memalign() failed\n"); | |
| Abort(); | |
| } | |
| size = s; | |
| } | |
| else | |
| { | |
| void *n; | |
| if (unlikely(posix_memalign(&n, alignment, s))) | |
| { | |
| IMPLERROUT("posix_memalign() failed\n"); | |
| Abort(); | |
| } | |
| if (retain) | |
| (void)memcpy(n, data, len); | |
| else | |
| len = 0; | |
| ::free(data); | |
| data = (uint8_t *)n; | |
| size = s; | |
| } | |
| } | |
| else | |
| { | |
| if (retain) | |
| { | |
| data = (uint8_t *)realloc(data, s); | |
| } | |
| else | |
| { | |
| if (data) | |
| ::free(data); | |
| data= (uint8_t *)malloc(s); | |
| len = 0; | |
| } | |
| runtime_assert(data, "realloc() failed"); | |
| } | |
| size = s; | |
| } | |
| } buf; | |
| uint64_t fileSize; | |
| uint32_t blockSize, capacityThreshold; | |
| void (*cb)(const uint64_t, const uint32_t size, void *); | |
| void *cbCtx; | |
| uint32_t alignment; | |
| private: | |
| void EnsureAccessImpl(const uint32_t size); | |
| _ALWAYS_inline_ void EnsureAccess(const uint32_t size) | |
| { | |
| EnsureAccessImpl(size); | |
| } | |
| // For unserializing varints, where we don't really know how much length a varint encoded repr. | |
| // we are going to make certain that at least 1 byte can be accessed | |
| _ALWAYS_inline_ void EnsureAccessUpto(const uint32_t size) | |
| { | |
| EnsureAccess(size); | |
| } | |
| public: | |
| _ALWAYS_inline_ void *Data(void) | |
| { | |
| return buf.DataWithOffset(offset - chunkOffset); | |
| } | |
| _ALWAYS_inline_ uint64_t GetFileSize(void) const | |
| { | |
| return fileSize; | |
| } | |
| _ALWAYS_inline_ uint64_t GetOffset(void) const | |
| { | |
| return offset; | |
| } | |
| BufFileReader(int fd, const uint64_t fSize) | |
| { | |
| this->fd = fd; | |
| buf.data = NULL; | |
| buf.size = 0; | |
| buf.alignment = 0; | |
| buf.len = 0; | |
| chunkOffset = 0; | |
| chunkSize = 0; | |
| chunkEnd = 0; | |
| offset = 0; | |
| fileSize = fSize; | |
| blockSize = 8192; | |
| capacityThreshold = 65536; | |
| cb = NULL; | |
| alignment = 0; | |
| } | |
| BufFileReader(void) | |
| { | |
| buf.data = NULL; | |
| buf.size = 0; | |
| buf.alignment = 0; | |
| buf.len = 0; | |
| fd = -1; | |
| chunkOffset = 0; | |
| chunkSize = 0; | |
| chunkEnd = 0; | |
| offset = 0; | |
| fileSize = 0; | |
| blockSize = 8192; | |
| capacityThreshold = 65536; | |
| cb = NULL; | |
| // Use Reset() to prepare | |
| alignment = 0; | |
| } | |
| ~BufFileReader(void) | |
| { | |
| buf.Free(); | |
| } | |
| void SetAlignment(const uint32_t a) | |
| { | |
| // ## O_DIRECT access | |
| // O_DIRECT flag may impose alignment restrictions on the length and address of user-space buffers and the file offset of I/Os. | |
| // In Linux alignment restrictions vary by filesystem and kernel version and might be absent entirely. | |
| // However there is currently no FS-indepedent interface for an application to discover these restrictions for a given file or FS. | |
| // Some filesystems provide their own interfaces for doing so, e.g XFS_IOC_DIOINFO operation in xfsctl(3) | |
| // | |
| // Under Linux 2.4, transfer sizes and the alignment of the user buffer and the file offset must all be multipled of the logical block size | |
| // of the filesystem. | |
| // | |
| // Under Linux 2.6, alignment to 512-byte boundaries suffices. | |
| // There is a LOT more to it, please consult the man page | |
| // | |
| // The gist of it is, you need to use SetAlignment() and SetMemAlignment() to a multiple of 512 under 2.6+ | |
| // alignment affects block aligment too. You, also, probably need to avoid it anyway | |
| // http://unix.stackexchange.com/questions/6467/use-of-o-direct-on-linux | |
| // | |
| runtime_assert(IsPowerOfTwo(a), "Not power of two alignment"); | |
| runtime_assert((blockSize%a) == 0, "alignment must be a multiple of block size"); | |
| alignment = a; | |
| } | |
| void SetMemAlignment(const uint64_t a) | |
| { | |
| runtime_assert(IsPowerOfTwo(a), "Not power of two alignment"); | |
| runtime_assert(buf.data == NULL, "Already allocated"); | |
| buf.alignment = a; | |
| } | |
| void SetCallback(void (*proc)(const uint64_t, const uint32_t, void *), void *procCtx) | |
| { | |
| cb = proc; | |
| cbCtx = procCtx; | |
| } | |
| void Reset(int fdescriptor, const uint64_t fs) | |
| { | |
| fd = fdescriptor; | |
| fileSize= fs; | |
| chunkOffset = 0; | |
| chunkSize = 0; | |
| chunkEnd = 0; | |
| offset = 0; | |
| blockSize = 8192; | |
| capacityThreshold= 65536; | |
| if (buf.size > 64 * 1024 * 1024) | |
| buf.Free(); | |
| alignment = 0; | |
| buf.alignment = 0; | |
| buf.len = 0; | |
| } | |
| void ResetChunk(void) | |
| { | |
| chunkOffset = 0; | |
| chunkSize = 0; | |
| chunkEnd = 0; | |
| offset = 0; | |
| } | |
| void Reset(int fdescriptor, const uint64_t fs, const uint64_t newOffset) | |
| { | |
| Reset(fdescriptor, fs); | |
| SetOffset(newOffset); | |
| } | |
| void SetBlockSize(const uint32_t s) | |
| { | |
| blockSize = ALIGNUP_TO_POWEROF2BOUNDARY(s, 512); | |
| if (blockSize > capacityThreshold) | |
| capacityThreshold = ALIGNUP_TO_POWEROF2BOUNDARY(blockSize, 65536); | |
| if (alignment) | |
| { | |
| runtime_assert((blockSize%alignment) == 0, "alignment must be a multiple of block size"); | |
| } | |
| } | |
| uint32_t BlockSize(void) const | |
| { | |
| return blockSize; | |
| } | |
| uint32_t CapacityThreshold(void) const | |
| { | |
| return capacityThreshold; | |
| } | |
| void SetCapacityThreshold(const uint32_t s) | |
| { | |
| capacityThreshold = ALIGNUP_TO_POWEROF2BOUNDARY(s, 8192); | |
| } | |
| _ALWAYS_inline_ void SeekTo(const uint64_t o) | |
| { | |
| offset = o; | |
| } | |
| void ResetBufferWithThreshold(const uint32_t n) | |
| { | |
| if (unlikely(buf.size > n)) | |
| buf.Free(); | |
| } | |
| void Serialize(const void *p, const uint32_t size) | |
| { | |
| ASSERT_NOT_REACHED(); | |
| } | |
| void Serialize(const uint32_t v) | |
| { | |
| ASSERT_NOT_REACHED(); | |
| } | |
| void Serialize(const double v) | |
| { | |
| ASSERT_NOT_REACHED(); | |
| } | |
| void Serialize(const uint8_t v) | |
| { | |
| ASSERT_NOT_REACHED(); | |
| } | |
| void SerializePackedUInt32WithLenPrefix(const uint32_t n) | |
| { | |
| ASSERT_NOT_REACHED(); | |
| } | |
| void SerializeVarUInt32(const uint32_t n) | |
| { | |
| ASSERT_NOT_REACHED(); | |
| } | |
| _ALWAYS_inline_ void Unserialize(void *const p, const uint32_t size) | |
| { | |
| EnsureAccess(size); | |
| (void)memcpy(p, Data(), size); | |
| offset+=size; | |
| } | |
| void Unserialize(IOBuffer *const out, const uint32_t size) | |
| { | |
| EnsureAccess(size); | |
| out->Serialize(Data(), size); | |
| offset+=size; | |
| } | |
| template<typename T> | |
| inline T Unserialize(void) | |
| { | |
| EnsureAccess(sizeof(T)); | |
| const T r = *(T *)Data(); | |
| offset+=sizeof(T); | |
| return r; | |
| } | |
| template<typename T> | |
| _ALWAYS_inline_ void Unserialize(T *dst) | |
| { | |
| Unserialize(dst, sizeof(T)); | |
| } | |
| inline uint32_t UnserializePackedUInt32WithLenPrefix(void) | |
| { | |
| uint8_t l; | |
| EnsureAccessUpto(16); | |
| const uint32_t r = Compression::UnpackUInt32WithLenPrefix((uint8_t *)Data(), l); | |
| runtime_assert(l < 16, "Unexpected Compression::UnpackUInt32WithLenPrefix() length displ"); | |
| offset+=l; | |
| return r; | |
| } | |
| inline uint32_t UnserializeVarUInt32(void) | |
| { | |
| EnsureAccessUpto(5); | |
| const uint8_t *b = (uint8_t *)Data(), *const base = b; | |
| const uint32_t r = Compression::UnpackUInt32(b); | |
| offset+=b - base; | |
| return r; | |
| } | |
| void *SeekAndRead(const uint64_t o, const uint32_t s) | |
| { | |
| SeekTo(o); | |
| EnsureAccess(s); | |
| void *const p = Data(); | |
| offset+=s; | |
| return p; | |
| } | |
| _ALWAYS_inline_ void *DataAt(const uint64_t o, const uint32_t s) | |
| { | |
| return SeekAndRead(o, s); | |
| } | |
| _ALWAYS_inline_ void *Peek(const uint32_t size) | |
| { | |
| EnsureAccess(size); | |
| return Data(); | |
| } | |
| _ALWAYS_inline_ void AdvanceOffset(const uint64_t s) | |
| { | |
| offset+=s; | |
| } | |
| _ALWAYS_inline_ void SetOffset(const uint64_t o) | |
| { | |
| offset = o; | |
| } | |
| void *operator new(const size_t size, IMemoryAllocator *allocator) | |
| { | |
| return allocator->Alloc(size); | |
| } | |
| void *operator new(const size_t size) | |
| { | |
| return ::operator new(size); | |
| } | |
| _ALWAYS_inline_ bool IsAtEnd(void) const | |
| { | |
| return offset == fileSize; | |
| } | |
| }; | |
| void BufFileReader::EnsureAccessImpl(const uint32_t size) | |
| { | |
| if (unlikely(offset < chunkOffset)) | |
| { | |
| const uint64_t end = offset + size; | |
| if (end >= chunkOffset) | |
| { | |
| // Can we just extend to the left? | |
| // We want to try to read >= blockSize, which better than reading < blockSize | |
| uint64_t leftOffset = unlikely(alignment) ? ALIGNDOWN_TO_POWEROF2BOUNDARY(offset, alignment) : offset; | |
| uint64_t extra = chunkOffset - leftOffset; | |
| const uint64_t computed = extra + chunkSize; | |
| if (unlikely(computed > capacityThreshold)) | |
| goto resetChunk; | |
| if (end > chunkEnd) | |
| goto resetChunk; | |
| const uint32_t rounded = ALIGNUP_TO_POWEROF2BOUNDARY(extra, blockSize); | |
| if (const uint32_t d = rounded - extra) | |
| { | |
| if (leftOffset < blockSize) | |
| { | |
| if (computed + leftOffset < capacityThreshold) | |
| { | |
| leftOffset = 0; | |
| extra = chunkOffset; | |
| } | |
| } | |
| else if (rounded < capacityThreshold) | |
| { | |
| leftOffset -= d; | |
| extra = rounded; | |
| } | |
| } | |
| buf.EnsureSize(chunkSize + extra); | |
| buf.SetLength(chunkSize); | |
| buf.InsertSpace(0, extra); | |
| // Due to alignment, we can certain read INTO existing buffer data, but that's OK | |
| (void)_pread64(fd, buf.Data(), unlikely(alignment) ? ALIGNUP_TO_POWEROF2BOUNDARY(extra, alignment) : extra, leftOffset, cb, cbCtx); | |
| chunkSize += extra; | |
| chunkOffset = leftOffset; | |
| buf.SetLengthAndTerm(chunkSize); | |
| return; | |
| } | |
| } | |
| else if (likely(offset < chunkEnd)) | |
| { | |
| const uint64_t end = offset + size; | |
| if (likely(end <= chunkEnd)) | |
| return; | |
| // Can we extend to the right? | |
| uint64_t rightOffset = unlikely(alignment) ? ALIGNUP_TO_POWEROF2BOUNDARY(offset + size, alignment) : offset + size; | |
| uint64_t extra = rightOffset - chunkEnd; | |
| const uint64_t computed = extra + chunkSize; | |
| if (computed > capacityThreshold) | |
| goto resetChunk; | |
| if (extra < blockSize) | |
| { | |
| const uint64_t n = Min<uint64_t>(fileSize, ALIGNUP_TO_POWEROF2BOUNDARY(rightOffset, blockSize)); | |
| const uint64_t rem = n - rightOffset; | |
| if (extra + rem < capacityThreshold) | |
| { | |
| extra+=rem; | |
| rightOffset+=rem; | |
| } | |
| } | |
| buf.EnsureSize(chunkSize + extra); | |
| (void)_pread64(fd, buf.DataWithOffset(chunkSize), extra, chunkEnd, cb, cbCtx); | |
| chunkSize += extra; | |
| chunkEnd = chunkOffset + chunkSize; | |
| buf.SetLengthAndTerm(chunkSize); | |
| return; | |
| } | |
| resetChunk: | |
| range64_t res; | |
| if (unlikely(alignment)) | |
| { | |
| // Block size is aligned already | |
| const uint64_t base = ALIGNDOWN_TO_POWEROF2BOUNDARY(offset, alignment), end = Min<uint64_t>(fileSize, base + ALIGNUP_TO_POWEROF2BOUNDARY(size, blockSize)); | |
| const uint64_t upto = Min<uint64_t>(fileSize, offset + size); | |
| if (upto <= end) | |
| res.Set(base, end - base); | |
| else | |
| res.Set(base, ALIGNUP_TO_POWEROF2BOUNDARY(upto, blockSize) - base); | |
| } | |
| else | |
| { | |
| const uint64_t end = Min<uint64_t>(fileSize, offset + ALIGNUP_TO_POWEROF2BOUNDARY(size, blockSize)); | |
| res.Set(offset, end - offset); | |
| } | |
| if (likely(alignment == 0)) // TODO: support this for aligned reads too | |
| { | |
| const auto toNextBlock = ALIGNUP_TO_POWEROF2BOUNDARY(res.len, blockSize); | |
| if (const auto d = toNextBlock - res.len) | |
| { | |
| if (toNextBlock < capacityThreshold) | |
| { | |
| res.len = toNextBlock; | |
| const auto end = res.End(); | |
| if (end > fileSize) | |
| { | |
| const auto d = end - fileSize; | |
| int64_t left = res.offset - d; | |
| if (left < 0) | |
| { | |
| res.len += left; | |
| res.offset = 0; | |
| } | |
| else | |
| { | |
| res.offset = left; | |
| } | |
| } | |
| } | |
| } | |
| } | |
| chunkOffset = res.offset; | |
| chunkSize = res.len; | |
| chunkEnd = res.End(); | |
| buf.SetZeroLength(); | |
| buf.EnsureSize(chunkSize, false); | |
| (void)_pread64(fd, buf.Data(), chunkSize, chunkOffset, cb, cbCtx); | |
| buf.SetLengthAndTerm(chunkSize); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment