Created
March 21, 2012 22:10
-
-
Save matteobertozzi/2153712 to your computer and use it in GitHub Desktop.
zcl stream next-gen, zero copy read/write api...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * - stream interface | |
| * [filter stream] | |
| * - buffered stream | |
| * - encoded stram | |
| * - buffer stream | |
| * - chunkq stream | |
| * | |
| * Copyright 2011-2012 Matteo Bertozzi | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| #include <sys/types.h> | |
| #include <sys/stat.h> | |
| #include <stdlib.h> | |
| #include <string.h> | |
| #include <stdint.h> | |
| #include <stdlib.h> | |
| #include <unistd.h> | |
| #include <stdio.h> | |
| #include <fcntl.h> | |
| typedef void **zdref_t; | |
| typedef void z_stream_t; | |
| typedef struct z_stream_vtable z_stream_vtable_t; | |
| struct z_stream_vtable { | |
| int (*write) (z_stream_t *stream, zdref_t data); | |
| int (*read) (z_stream_t *stream, const zdref_t data); | |
| int (*backup) (z_stream_t *stream, unsigned int n); | |
| int (*push) (z_stream_t *stream, const void *data, unsigned int n); | |
| int (*fetch) (z_stream_t *stream, void *data, unsigned int n); | |
| int (*flush) (z_stream_t *stream); | |
| int (*seek) (z_stream_t *stream, uint64_t offset); | |
| uint64_t (*tell) (z_stream_t *stream); | |
| uint64_t (*length) (z_stream_t *stream); | |
| }; | |
| struct stream { | |
| const z_stream_vtable_t *vtable; | |
| }; | |
| #define z_stream_call(x, method, ...) \ | |
| (((struct stream *)x)->vtable->method(x, ##__VA_ARGS__)) | |
| #define z_stream_write(stream, data) z_stream_call(stream, write, data) | |
| #define z_stream_read(stream, data) z_stream_call(stream, read, data) | |
| #define z_stream_backup(stream, n) z_stream_call(stream, backup, n) | |
| #define z_stream_push(stream, data, n) z_stream_call(stream, push, data, n) | |
| #define z_stream_fetch(stream, data, n) z_stream_call(stream, fetch, data, n) | |
| #define z_stream_tell(stream) z_stream_call(stream, tell) | |
| #define z_stream_read_uint32(stream, v) z_stream_fetch_fully(stream, v, 4) | |
| #define z_stream_write_uint32(stream, v) z_stream_push_fully(stream, &v, 4) | |
| typedef struct buffered_writer z_buffered_writer_t; | |
| typedef struct buffered_reader z_buffered_reader_t; | |
| typedef void z_codec_t; | |
| typedef struct z_disk z_disk_t; | |
| #define z_codec_max_length(codec, size) (size) | |
| #define z_codec_encode(codec, src, src_size, dst, dst_size) 0 | |
| #define z_codec_decode(codec, src, src_size, dst, dst_size) \ | |
| memcpy(dst, src, src_size) | |
| struct buffered_writer { | |
| const z_stream_vtable_t *vtable; | |
| struct stream * stream; | |
| z_codec_t * codec; | |
| unsigned char * blob; | |
| unsigned int size; | |
| unsigned int used; | |
| }; | |
| struct buffered_reader { | |
| const z_stream_vtable_t *vtable; | |
| struct stream * stream; | |
| z_codec_t * codec; | |
| unsigned char * blob; | |
| unsigned int size; | |
| unsigned int used; | |
| }; | |
| struct z_disk { | |
| z_stream_vtable_t *vtable; | |
| uint64_t offset; | |
| int fd; | |
| }; | |
| /* ============================================================================ | |
| * PUBLIC Stream API | |
| */ | |
| int z_stream_push_fully (z_stream_t *stream, | |
| const void *buffer, | |
| unsigned int size) | |
| { | |
| const unsigned char *pbuf = (const unsigned char *)buffer; | |
| int n = 0; | |
| int wr; | |
| while (size > 0) { | |
| if ((wr = z_stream_push(stream, pbuf, size)) <= 0) | |
| break; | |
| n += wr; | |
| pbuf += wr; | |
| size -= wr; | |
| } | |
| return(n); | |
| } | |
| int z_stream_fetch_fully (z_stream_t *stream, | |
| void *buffer, | |
| unsigned int size) | |
| { | |
| unsigned char *pbuf = (unsigned char *)buffer; | |
| int n = 0; | |
| int rd; | |
| while (size > 0) { | |
| if ((rd = z_stream_fetch(stream, pbuf, size)) <= 0) | |
| break; | |
| n += rd; | |
| pbuf += rd; | |
| size -= rd; | |
| } | |
| return(n); | |
| } | |
| /* ============================================================================ | |
| * PRIVATE Buffered/Encoded Writer Stream plug | |
| */ | |
| static int __encoded_block_write (z_buffered_writer_t *writer, | |
| const unsigned char *buffer, | |
| unsigned int size) | |
| { | |
| unsigned char *cbuf; | |
| int csize; | |
| if (writer->codec == NULL) { | |
| /* TODO: OPTIMIZE: check for zero-copy-write on underline stream */ | |
| return(z_stream_push_fully(writer->stream, buffer, size)); | |
| } | |
| csize = z_codec_max_length(writer->codec, size); | |
| if ((cbuf = (unsigned char *) malloc(csize)) == NULL) | |
| return(-1); | |
| csize = z_codec_encode(writer->codec, cbuf, csize, buffer, size); | |
| if (z_stream_write_uint32(writer->stream, size) <= 0) | |
| return(-2); | |
| if (z_stream_write_uint32(writer->stream, csize) <= 0) | |
| return(-2); | |
| if (z_stream_push_fully(writer->stream, cbuf, csize) <= 0) | |
| return(-3); | |
| free(cbuf); | |
| return(writer->size); | |
| } | |
| static int __encoded_write_blocks (z_buffered_writer_t *writer, | |
| const unsigned char *buffer, | |
| unsigned int size) | |
| { | |
| unsigned int blk_size = writer->size; | |
| int n, wr; | |
| n = 0; | |
| while (size >= writer->size) { | |
| if ((wr = __encoded_block_write(writer, buffer, blk_size)) != blk_size) | |
| return(-(n + wr)); | |
| buffer += blk_size; | |
| size -= blk_size; | |
| n += blk_size; | |
| } | |
| return(n); | |
| } | |
| static int __buffered_writer_flush (z_stream_t *obj) { | |
| z_buffered_writer_t *writer = (z_buffered_writer_t *)obj; | |
| return(__encoded_block_write(writer, writer->blob, writer->used)); | |
| } | |
| static int __buffered_writer_write (z_stream_t *obj, zdref_t data) { | |
| z_buffered_writer_t *writer = (z_buffered_writer_t *)obj; | |
| int n; | |
| /* Flush data if buffer is full */ | |
| if (!(n = writer->size - writer->used)) { | |
| __buffered_writer_flush(obj); | |
| writer->used = 0; | |
| n = writer->size; | |
| } | |
| *data = writer->blob + writer->used; | |
| writer->used = writer->size; | |
| return(n); | |
| } | |
| static int __buffered_writer_backup (z_stream_t *obj, unsigned int size) { | |
| ((z_buffered_writer_t *)obj)->used -= size; | |
| return(0); | |
| } | |
| static int __buffered_writer_push (z_stream_t *stream, | |
| const void *buffer, | |
| unsigned int size) | |
| { | |
| z_buffered_writer_t *writer = (z_buffered_writer_t *)stream; | |
| const unsigned char *pblob = (const unsigned char *)buffer; | |
| unsigned int avail = writer->size - writer->used; | |
| int n, wr; | |
| /* Flush directly if input is greater than buffer */ | |
| if (writer->used == 0) { | |
| if ((n = __encoded_write_blocks(writer, pblob, size)) < 0) | |
| return(-n); | |
| pblob += n; | |
| size -= n; | |
| } | |
| if (writer->blob == NULL) { | |
| if ((writer->blob = (unsigned char *) malloc(writer->size)) == NULL) | |
| return(0); | |
| } | |
| /* There's all the space that you need! */ | |
| if (size <= avail) { | |
| memcpy(writer->blob + writer->used, pblob, size); | |
| writer->used += size; | |
| return(size); | |
| } | |
| /* Fill the buffer, better to flush */ | |
| memcpy(writer->blob + writer->used, pblob, avail); | |
| writer->used += avail; | |
| pblob += avail; | |
| size -= avail; | |
| if ((wr = __encoded_block_write(writer, writer->blob, writer->size)) != writer->size) | |
| return(avail - (writer->size - wr)); | |
| /* Flush directly if input is greater than buffer */ | |
| n = avail; | |
| if ((wr = __encoded_write_blocks(writer, pblob, size)) < 0) | |
| return(-wr); | |
| pblob += wr; | |
| size -= wr; | |
| /* Store remaining in buffer */ | |
| writer->used = size; | |
| memcpy(writer->blob, pblob, size); | |
| return(n + size); | |
| } | |
| static int __buffered_writer_seek (z_stream_t *stream, uint64_t offset) { | |
| return(0); /* ??? */ | |
| } | |
| static uint64_t __buffered_writer_tell (z_stream_t *stream) { | |
| return(0); /* ??? */ | |
| } | |
| static z_stream_vtable_t __buffered_writer_vtable = { | |
| .write = __buffered_writer_write, | |
| .read = NULL, | |
| .backup = __buffered_writer_backup, | |
| .fetch = NULL, | |
| .push = __buffered_writer_push, | |
| .flush = __buffered_writer_flush, | |
| .seek = __buffered_writer_seek, | |
| .tell = __buffered_writer_tell, | |
| .length = NULL, | |
| }; | |
| /* ============================================================================ | |
| * PUBLIC Buffered Stream API | |
| */ | |
| int buffered_writer_open (z_buffered_writer_t *stream, | |
| z_stream_t *ustream, | |
| z_codec_t *codec, | |
| unsigned int size) | |
| { | |
| unsigned char *blob; | |
| if ((blob = (unsigned char *) malloc(size)) == NULL) | |
| return(-1); | |
| stream->vtable = &__buffered_writer_vtable; | |
| stream->stream = ustream; | |
| stream->codec = codec; | |
| stream->blob = blob; | |
| stream->size = size; | |
| stream->used = 0; | |
| return(0); | |
| } | |
| void buffered_writer_close (z_buffered_writer_t *stream) { | |
| if (stream->blob != NULL) { | |
| __buffered_writer_flush(stream); | |
| free(stream->blob); | |
| stream->blob = NULL; | |
| } | |
| } | |
| /* ============================================================================ | |
| * PRIVATE Buffered/Encoded Reader Stream plug | |
| */ | |
| static int __encoded_block_read (z_buffered_reader_t *reader) { | |
| unsigned char *cbuf; | |
| uint64_t csize; | |
| uint64_t size; | |
| if (reader->codec == NULL) { | |
| /* TODO: OPTIMIZE: some improvement if we've zero-copy-read? */ | |
| return(z_stream_fetch_fully(reader->stream, reader->blob, reader->size)); | |
| } | |
| /* Read header */ | |
| if (z_stream_read_uint32(reader->stream, &size) <= 0) | |
| return(-1); | |
| if (z_stream_read_uint32(reader->stream, &csize) <= 0) | |
| return(-2); | |
| /* Alloc encoded buffer */ | |
| if ((cbuf = (unsigned char *) malloc(csize)) == NULL) | |
| return(-3); | |
| /* Read encoded data */ | |
| if (z_stream_fetch_fully(reader->stream, cbuf, csize) != (int)csize) { | |
| free(cbuf); | |
| return(-4); | |
| } | |
| /* Prepare new buffer for data */ | |
| if (reader->size != size) { | |
| if (reader->blob != NULL) | |
| free(reader->blob); | |
| if ((reader->blob = (unsigned char *) malloc(size)) == NULL) { | |
| free(cbuf); | |
| return(-5); | |
| } | |
| reader->size = size; | |
| reader->used = 0; | |
| } | |
| /* Transform */ | |
| if (z_codec_decode(reader->codec, reader->blob, size, cbuf, csize)) { | |
| free(cbuf); | |
| return(-6); | |
| } | |
| /* Free encoded buffer */ | |
| free(cbuf); | |
| return(0); | |
| } | |
| static int __buffered_reader_read (z_stream_t *obj, const zdref_t data) { | |
| z_buffered_reader_t *reader = (z_buffered_reader_t *)obj; | |
| int n; | |
| /* Fetch data if buffer is empty */ | |
| if (reader->used == 0) | |
| n = __encoded_block_read(reader); | |
| *data = reader->blob + reader->used; | |
| reader->used = 0; | |
| return(n); | |
| } | |
| static int __buffered_reader_backup (z_stream_t *obj, unsigned int size) { | |
| ((z_buffered_reader_t *)obj)->used -= size; | |
| return(0); | |
| } | |
| static int __buffered_reader_fetch (z_stream_t *obj, | |
| void *buffer, | |
| unsigned int n) | |
| { | |
| z_buffered_reader_t *reader = (z_buffered_reader_t *)obj; | |
| unsigned char *pbuffer = (unsigned char *)buffer; | |
| unsigned int avail = reader->size - reader->used; | |
| if (n <= avail) { | |
| memcpy(buffer, reader->blob + reader->used, n); | |
| reader->used += n; | |
| return(n); | |
| } | |
| /* copy the old buffer to the end */ | |
| if (avail > 0) { | |
| memcpy(pbuffer, reader->blob + reader->used, avail); | |
| pbuffer += avail; | |
| n -= avail; | |
| } | |
| do { | |
| if (__encoded_block_read(reader) < 0) | |
| break; | |
| /* copy to user */ | |
| avail = (n > reader->size) ? reader->size : n; | |
| memcpy(pbuffer, reader->blob, avail); | |
| reader->used = avail; | |
| pbuffer += avail; | |
| n -= avail; | |
| } while (n > 0); | |
| return(0); /* ??? */ | |
| } | |
| static int __buffered_reader_seek (z_stream_t *stream, uint64_t offset) { | |
| /* TODO: CHECK-ME: seek against what? on encoded data doesn't make sense | |
| * you don't know the right position in the stream... | |
| */ | |
| return(0); /* ??? */ | |
| } | |
| static uint64_t __buffered_reader_tell (z_stream_t *stream) { | |
| z_buffered_reader_t *reader = (z_buffered_reader_t *)stream; | |
| /* TODO: CHECK-ME: This doesn't make sense when there's a codec set | |
| * because we can have the "right" position on the underline stream | |
| * (encoded data position) but the 'used' is an offset to an unencoded data | |
| */ | |
| return(z_stream_tell(reader->stream) + reader->used); | |
| } | |
| static z_stream_vtable_t __buffered_reader_vtable = { | |
| .write = NULL, | |
| .read = __buffered_reader_read, | |
| .backup = __buffered_reader_backup, | |
| .fetch = __buffered_reader_fetch, | |
| .push = NULL, | |
| .flush = NULL, | |
| .seek = __buffered_reader_seek, | |
| .tell = __buffered_reader_tell, | |
| .length = NULL, | |
| }; | |
| /* ============================================================================ | |
| * PUBLIC Buffered Reader Stream API | |
| */ | |
| int buffered_reader_open (z_buffered_reader_t *stream, | |
| z_stream_t *ustream, | |
| z_codec_t *codec, | |
| unsigned int size) | |
| { | |
| unsigned char *blob; | |
| if (codec == NULL) { | |
| if ((blob = (unsigned char *) malloc(size)) == NULL) | |
| return(-1); | |
| } else { | |
| /* If codec is specified, buffered size is readed from disk | |
| * and can be different for each block (compression). | |
| */ | |
| blob = NULL; | |
| size = 0; | |
| } | |
| stream->vtable = &__buffered_reader_vtable; | |
| stream->stream = ustream; | |
| stream->codec = codec; | |
| stream->blob = blob; | |
| stream->size = size; | |
| stream->used = 0; | |
| return(0); | |
| } | |
| void buffered_reader_close (z_buffered_reader_t *stream) { | |
| if (stream->blob != NULL) { | |
| free(stream->blob); | |
| stream->blob = NULL; | |
| } | |
| } | |
| /* ============================================================================ | |
| * PRIVATE Disk Stream plug | |
| */ | |
| static int __disk_stream_flush (z_stream_t *stream) { | |
| /* TODO: use fdatasync() on linux? */ | |
| return(fsync(((z_disk_t *)stream)->fd)); | |
| } | |
| static int __disk_stream_push (z_stream_t *stream, | |
| const void *buffer, | |
| unsigned int n) | |
| { | |
| z_disk_t *disk = (z_disk_t *)stream; | |
| ssize_t wr; | |
| int iwr; | |
| if ((wr = pwrite(disk->fd, buffer, n, disk->offset)) < 0) | |
| return(-1); | |
| iwr = (int)(wr & 0xffffffff); | |
| disk->offset += iwr; | |
| return(iwr); | |
| } | |
| static int __disk_stream_fetch (z_stream_t *stream, | |
| void *buffer, | |
| unsigned int n) | |
| { | |
| z_disk_t *disk = (z_disk_t *)stream; | |
| ssize_t rd; | |
| int ird; | |
| if ((rd = pread(disk->fd, buffer, n, disk->offset)) < 0) | |
| return(-1); | |
| ird = (int)(rd & 0xffffffff); | |
| disk->offset += ird; | |
| return(ird); | |
| } | |
| static int __disk_stream_seek (z_stream_t *stream, uint64_t offset) { | |
| /* TODO: check boundary? */ | |
| ((z_disk_t *)stream)->offset = offset; | |
| return(0); | |
| } | |
| static uint64_t __disk_stream_tell (z_stream_t *stream) { | |
| return(((z_disk_t *)stream)->offset); | |
| } | |
| static uint64_t __disk_stream_length (z_stream_t *stream) { | |
| struct stat buf; | |
| if (fstat(((z_disk_t *)stream)->fd, &buf) < 0) | |
| return(0); /* TODO: FIXME: MAX_UINT64? */ | |
| return(buf.st_size); | |
| } | |
| static z_stream_vtable_t __disk_stream_vtable = { | |
| .write = NULL, /* disk doesn't implement zero-copy read/write */ | |
| .read = NULL, /* disk doesn't implement zero-copy read/write */ | |
| .backup = NULL, /* disk doesn't implement zero-copy read/write */ | |
| .fetch = __disk_stream_fetch, | |
| .push = __disk_stream_push, | |
| .flush = __disk_stream_flush, | |
| .seek = __disk_stream_seek, | |
| .tell = __disk_stream_tell, | |
| .length = __disk_stream_length, | |
| }; | |
| /* ============================================================================ | |
| * PUBLIC Disk API | |
| */ | |
| int disk_stream_create (z_disk_t *disk, const char *path, int flags, int mode) { | |
| disk->vtable = &__disk_stream_vtable; | |
| disk->offset = 0; | |
| if ((disk->fd = open(path, flags, mode)) < 0) { | |
| perror("open()"); | |
| return(-1); | |
| } | |
| return(0); | |
| } | |
| int disk_stream_open (z_disk_t *disk, const char *path, int flags) { | |
| disk->vtable = &__disk_stream_vtable; | |
| disk->offset = 0; | |
| if ((disk->fd = open(path, flags)) < 0) { | |
| perror("open()"); | |
| return(-1); | |
| } | |
| return(0); | |
| } | |
| int disk_stream_from_fd (z_disk_t *disk, int fd, uint64_t offset) { | |
| disk->vtable = &__disk_stream_vtable; | |
| disk->offset = offset; | |
| disk->fd = fd; | |
| return(0); | |
| } | |
| void disk_stream_close (z_disk_t *disk) { | |
| close(disk->fd); | |
| } | |
| /* ============================================================================ | |
| * DEMO api usage... | |
| */ | |
| #include <sys/time.h> | |
| #define __USEC_PER_SEC ((uint64_t)1000000U) | |
| uint64_t z_time_micros (void) { | |
| struct timeval now; | |
| gettimeofday(&now, NULL); | |
| return(now.tv_sec * __USEC_PER_SEC + now.tv_usec); | |
| } | |
| float z_timer_secs (uint64_t s, uint64_t e) { | |
| return((float)(e - s) / __USEC_PER_SEC); | |
| } | |
| int main (int argc, char **argv) { | |
| #if 1 | |
| z_buffered_writer_t wbuffer; | |
| z_buffered_reader_t rbuffer; | |
| z_disk_t disk; | |
| void *buffer; | |
| int i, n; | |
| printf("vtable %lu\n", sizeof(z_stream_vtable_t)); | |
| printf("disk %lu\n", sizeof(z_disk_t)); | |
| printf("bwriter %lu\n", sizeof(z_buffered_writer_t)); | |
| printf("breader %lu\n", sizeof(z_buffered_reader_t)); | |
| /* Write something on disk */ | |
| disk_stream_create(&disk, "test.txt", O_TRUNC | O_CREAT | O_WRONLY, 0644); | |
| buffered_writer_open(&wbuffer, &disk, NULL, 16); | |
| n = z_stream_write(&wbuffer, &buffer); | |
| memset(buffer, 'a', n); | |
| n = z_stream_write(&wbuffer, &buffer); | |
| memset(buffer, 'b', n - 8); | |
| z_stream_backup(&wbuffer, 8); | |
| n = z_stream_write(&wbuffer, &buffer); | |
| memset(buffer, 'c', n); | |
| z_stream_push_fully(&wbuffer, "This is a long test to overflow the buffer", 42); | |
| buffered_writer_close(&wbuffer); | |
| disk_stream_close(&disk); | |
| /* Read something from disk */ | |
| disk_stream_open(&disk, "test.txt", O_RDONLY); | |
| buffered_reader_open(&rbuffer, &disk, NULL, 8); | |
| while ((n = z_stream_read(&rbuffer, &buffer)) > 0) { | |
| for (i = 0; i < n; ++i) putc(((const char *)buffer)[i], stdout); | |
| putc('\n', stdout); | |
| } | |
| buffered_reader_close(&rbuffer); | |
| disk_stream_close(&disk); | |
| #elif 0 | |
| char buffer[16 * 1024]; | |
| unsigned int i; | |
| uint64_t s, e; | |
| off_t offset; | |
| int fd; | |
| memset(buffer, 1, sizeof(buffer)); | |
| if ((fd = open("test.data", O_TRUNC | O_CREAT | O_WRONLY, 0644)) < 0) { | |
| perror("open()"); | |
| return(1); | |
| } | |
| s = z_time_micros(); | |
| offset = 0; | |
| for (i = 0; i < 16 * 1024; ++i) { | |
| pwrite(fd, buffer, sizeof(buffer), offset); | |
| offset += sizeof(buffer); | |
| } | |
| e = z_time_micros(); | |
| printf("Timed %.3fsec\n", z_timer_secs(s, e)); | |
| close(fd); | |
| #else | |
| //char buffer[16 * 1024]; | |
| void *buffer; | |
| z_buffered_writer_t wbuffer; | |
| unsigned int i; | |
| uint64_t s, e; | |
| uint64_t offset; | |
| z_disk_t disk; | |
| int n; | |
| disk_stream_create(&disk, "test.data", O_TRUNC | O_CREAT | O_WRONLY, 0644); | |
| buffered_writer_open(&wbuffer, &disk, NULL, 16 * 1024); | |
| s = z_time_micros(); | |
| offset = 0; | |
| for (i = 0; i < 16 * 1024; ++i) { | |
| n = z_stream_write(&wbuffer, &buffer); | |
| memset(buffer, 'a', n); | |
| //z_stream_push_fully(&wbuffer, buffer, sizeof(buffer)); | |
| offset += sizeof(buffer); | |
| } | |
| e = z_time_micros(); | |
| printf("Timed %.3fsec\n", z_timer_secs(s, e)); | |
| buffered_writer_close(&wbuffer); | |
| disk_stream_close(&disk); | |
| #endif | |
| return(0); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment