Skip to content

Instantly share code, notes, and snippets.

@matteobertozzi
Created March 21, 2012 22:10
Show Gist options
  • Select an option

  • Save matteobertozzi/2153712 to your computer and use it in GitHub Desktop.

Select an option

Save matteobertozzi/2153712 to your computer and use it in GitHub Desktop.
zcl stream next-gen, zero copy read/write api...
/*
* - stream interface
* [filter stream]
* - buffered stream
* - encoded stram
* - buffer stream
* - chunkq stream
*
* Copyright 2011-2012 Matteo Bertozzi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/types.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
typedef void **zdref_t;
typedef void z_stream_t;
typedef struct z_stream_vtable z_stream_vtable_t;
struct z_stream_vtable {
int (*write) (z_stream_t *stream, zdref_t data);
int (*read) (z_stream_t *stream, const zdref_t data);
int (*backup) (z_stream_t *stream, unsigned int n);
int (*push) (z_stream_t *stream, const void *data, unsigned int n);
int (*fetch) (z_stream_t *stream, void *data, unsigned int n);
int (*flush) (z_stream_t *stream);
int (*seek) (z_stream_t *stream, uint64_t offset);
uint64_t (*tell) (z_stream_t *stream);
uint64_t (*length) (z_stream_t *stream);
};
struct stream {
const z_stream_vtable_t *vtable;
};
#define z_stream_call(x, method, ...) \
(((struct stream *)x)->vtable->method(x, ##__VA_ARGS__))
#define z_stream_write(stream, data) z_stream_call(stream, write, data)
#define z_stream_read(stream, data) z_stream_call(stream, read, data)
#define z_stream_backup(stream, n) z_stream_call(stream, backup, n)
#define z_stream_push(stream, data, n) z_stream_call(stream, push, data, n)
#define z_stream_fetch(stream, data, n) z_stream_call(stream, fetch, data, n)
#define z_stream_tell(stream) z_stream_call(stream, tell)
#define z_stream_read_uint32(stream, v) z_stream_fetch_fully(stream, v, 4)
#define z_stream_write_uint32(stream, v) z_stream_push_fully(stream, &v, 4)
typedef struct buffered_writer z_buffered_writer_t;
typedef struct buffered_reader z_buffered_reader_t;
typedef void z_codec_t;
typedef struct z_disk z_disk_t;
#define z_codec_max_length(codec, size) (size)
#define z_codec_encode(codec, src, src_size, dst, dst_size) 0
#define z_codec_decode(codec, src, src_size, dst, dst_size) \
memcpy(dst, src, src_size)
struct buffered_writer {
const z_stream_vtable_t *vtable;
struct stream * stream;
z_codec_t * codec;
unsigned char * blob;
unsigned int size;
unsigned int used;
};
struct buffered_reader {
const z_stream_vtable_t *vtable;
struct stream * stream;
z_codec_t * codec;
unsigned char * blob;
unsigned int size;
unsigned int used;
};
struct z_disk {
z_stream_vtable_t *vtable;
uint64_t offset;
int fd;
};
/* ============================================================================
* PUBLIC Stream API
*/
int z_stream_push_fully (z_stream_t *stream,
const void *buffer,
unsigned int size)
{
const unsigned char *pbuf = (const unsigned char *)buffer;
int n = 0;
int wr;
while (size > 0) {
if ((wr = z_stream_push(stream, pbuf, size)) <= 0)
break;
n += wr;
pbuf += wr;
size -= wr;
}
return(n);
}
int z_stream_fetch_fully (z_stream_t *stream,
void *buffer,
unsigned int size)
{
unsigned char *pbuf = (unsigned char *)buffer;
int n = 0;
int rd;
while (size > 0) {
if ((rd = z_stream_fetch(stream, pbuf, size)) <= 0)
break;
n += rd;
pbuf += rd;
size -= rd;
}
return(n);
}
/* ============================================================================
* PRIVATE Buffered/Encoded Writer Stream plug
*/
static int __encoded_block_write (z_buffered_writer_t *writer,
const unsigned char *buffer,
unsigned int size)
{
unsigned char *cbuf;
int csize;
if (writer->codec == NULL) {
/* TODO: OPTIMIZE: check for zero-copy-write on underline stream */
return(z_stream_push_fully(writer->stream, buffer, size));
}
csize = z_codec_max_length(writer->codec, size);
if ((cbuf = (unsigned char *) malloc(csize)) == NULL)
return(-1);
csize = z_codec_encode(writer->codec, cbuf, csize, buffer, size);
if (z_stream_write_uint32(writer->stream, size) <= 0)
return(-2);
if (z_stream_write_uint32(writer->stream, csize) <= 0)
return(-2);
if (z_stream_push_fully(writer->stream, cbuf, csize) <= 0)
return(-3);
free(cbuf);
return(writer->size);
}
static int __encoded_write_blocks (z_buffered_writer_t *writer,
const unsigned char *buffer,
unsigned int size)
{
unsigned int blk_size = writer->size;
int n, wr;
n = 0;
while (size >= writer->size) {
if ((wr = __encoded_block_write(writer, buffer, blk_size)) != blk_size)
return(-(n + wr));
buffer += blk_size;
size -= blk_size;
n += blk_size;
}
return(n);
}
static int __buffered_writer_flush (z_stream_t *obj) {
z_buffered_writer_t *writer = (z_buffered_writer_t *)obj;
return(__encoded_block_write(writer, writer->blob, writer->used));
}
static int __buffered_writer_write (z_stream_t *obj, zdref_t data) {
z_buffered_writer_t *writer = (z_buffered_writer_t *)obj;
int n;
/* Flush data if buffer is full */
if (!(n = writer->size - writer->used)) {
__buffered_writer_flush(obj);
writer->used = 0;
n = writer->size;
}
*data = writer->blob + writer->used;
writer->used = writer->size;
return(n);
}
static int __buffered_writer_backup (z_stream_t *obj, unsigned int size) {
((z_buffered_writer_t *)obj)->used -= size;
return(0);
}
static int __buffered_writer_push (z_stream_t *stream,
const void *buffer,
unsigned int size)
{
z_buffered_writer_t *writer = (z_buffered_writer_t *)stream;
const unsigned char *pblob = (const unsigned char *)buffer;
unsigned int avail = writer->size - writer->used;
int n, wr;
/* Flush directly if input is greater than buffer */
if (writer->used == 0) {
if ((n = __encoded_write_blocks(writer, pblob, size)) < 0)
return(-n);
pblob += n;
size -= n;
}
if (writer->blob == NULL) {
if ((writer->blob = (unsigned char *) malloc(writer->size)) == NULL)
return(0);
}
/* There's all the space that you need! */
if (size <= avail) {
memcpy(writer->blob + writer->used, pblob, size);
writer->used += size;
return(size);
}
/* Fill the buffer, better to flush */
memcpy(writer->blob + writer->used, pblob, avail);
writer->used += avail;
pblob += avail;
size -= avail;
if ((wr = __encoded_block_write(writer, writer->blob, writer->size)) != writer->size)
return(avail - (writer->size - wr));
/* Flush directly if input is greater than buffer */
n = avail;
if ((wr = __encoded_write_blocks(writer, pblob, size)) < 0)
return(-wr);
pblob += wr;
size -= wr;
/* Store remaining in buffer */
writer->used = size;
memcpy(writer->blob, pblob, size);
return(n + size);
}
static int __buffered_writer_seek (z_stream_t *stream, uint64_t offset) {
return(0); /* ??? */
}
static uint64_t __buffered_writer_tell (z_stream_t *stream) {
return(0); /* ??? */
}
static z_stream_vtable_t __buffered_writer_vtable = {
.write = __buffered_writer_write,
.read = NULL,
.backup = __buffered_writer_backup,
.fetch = NULL,
.push = __buffered_writer_push,
.flush = __buffered_writer_flush,
.seek = __buffered_writer_seek,
.tell = __buffered_writer_tell,
.length = NULL,
};
/* ============================================================================
* PUBLIC Buffered Stream API
*/
int buffered_writer_open (z_buffered_writer_t *stream,
z_stream_t *ustream,
z_codec_t *codec,
unsigned int size)
{
unsigned char *blob;
if ((blob = (unsigned char *) malloc(size)) == NULL)
return(-1);
stream->vtable = &__buffered_writer_vtable;
stream->stream = ustream;
stream->codec = codec;
stream->blob = blob;
stream->size = size;
stream->used = 0;
return(0);
}
void buffered_writer_close (z_buffered_writer_t *stream) {
if (stream->blob != NULL) {
__buffered_writer_flush(stream);
free(stream->blob);
stream->blob = NULL;
}
}
/* ============================================================================
* PRIVATE Buffered/Encoded Reader Stream plug
*/
static int __encoded_block_read (z_buffered_reader_t *reader) {
unsigned char *cbuf;
uint64_t csize;
uint64_t size;
if (reader->codec == NULL) {
/* TODO: OPTIMIZE: some improvement if we've zero-copy-read? */
return(z_stream_fetch_fully(reader->stream, reader->blob, reader->size));
}
/* Read header */
if (z_stream_read_uint32(reader->stream, &size) <= 0)
return(-1);
if (z_stream_read_uint32(reader->stream, &csize) <= 0)
return(-2);
/* Alloc encoded buffer */
if ((cbuf = (unsigned char *) malloc(csize)) == NULL)
return(-3);
/* Read encoded data */
if (z_stream_fetch_fully(reader->stream, cbuf, csize) != (int)csize) {
free(cbuf);
return(-4);
}
/* Prepare new buffer for data */
if (reader->size != size) {
if (reader->blob != NULL)
free(reader->blob);
if ((reader->blob = (unsigned char *) malloc(size)) == NULL) {
free(cbuf);
return(-5);
}
reader->size = size;
reader->used = 0;
}
/* Transform */
if (z_codec_decode(reader->codec, reader->blob, size, cbuf, csize)) {
free(cbuf);
return(-6);
}
/* Free encoded buffer */
free(cbuf);
return(0);
}
static int __buffered_reader_read (z_stream_t *obj, const zdref_t data) {
z_buffered_reader_t *reader = (z_buffered_reader_t *)obj;
int n;
/* Fetch data if buffer is empty */
if (reader->used == 0)
n = __encoded_block_read(reader);
*data = reader->blob + reader->used;
reader->used = 0;
return(n);
}
static int __buffered_reader_backup (z_stream_t *obj, unsigned int size) {
((z_buffered_reader_t *)obj)->used -= size;
return(0);
}
static int __buffered_reader_fetch (z_stream_t *obj,
void *buffer,
unsigned int n)
{
z_buffered_reader_t *reader = (z_buffered_reader_t *)obj;
unsigned char *pbuffer = (unsigned char *)buffer;
unsigned int avail = reader->size - reader->used;
if (n <= avail) {
memcpy(buffer, reader->blob + reader->used, n);
reader->used += n;
return(n);
}
/* copy the old buffer to the end */
if (avail > 0) {
memcpy(pbuffer, reader->blob + reader->used, avail);
pbuffer += avail;
n -= avail;
}
do {
if (__encoded_block_read(reader) < 0)
break;
/* copy to user */
avail = (n > reader->size) ? reader->size : n;
memcpy(pbuffer, reader->blob, avail);
reader->used = avail;
pbuffer += avail;
n -= avail;
} while (n > 0);
return(0); /* ??? */
}
static int __buffered_reader_seek (z_stream_t *stream, uint64_t offset) {
/* TODO: CHECK-ME: seek against what? on encoded data doesn't make sense
* you don't know the right position in the stream...
*/
return(0); /* ??? */
}
static uint64_t __buffered_reader_tell (z_stream_t *stream) {
z_buffered_reader_t *reader = (z_buffered_reader_t *)stream;
/* TODO: CHECK-ME: This doesn't make sense when there's a codec set
* because we can have the "right" position on the underline stream
* (encoded data position) but the 'used' is an offset to an unencoded data
*/
return(z_stream_tell(reader->stream) + reader->used);
}
static z_stream_vtable_t __buffered_reader_vtable = {
.write = NULL,
.read = __buffered_reader_read,
.backup = __buffered_reader_backup,
.fetch = __buffered_reader_fetch,
.push = NULL,
.flush = NULL,
.seek = __buffered_reader_seek,
.tell = __buffered_reader_tell,
.length = NULL,
};
/* ============================================================================
* PUBLIC Buffered Reader Stream API
*/
int buffered_reader_open (z_buffered_reader_t *stream,
z_stream_t *ustream,
z_codec_t *codec,
unsigned int size)
{
unsigned char *blob;
if (codec == NULL) {
if ((blob = (unsigned char *) malloc(size)) == NULL)
return(-1);
} else {
/* If codec is specified, buffered size is readed from disk
* and can be different for each block (compression).
*/
blob = NULL;
size = 0;
}
stream->vtable = &__buffered_reader_vtable;
stream->stream = ustream;
stream->codec = codec;
stream->blob = blob;
stream->size = size;
stream->used = 0;
return(0);
}
void buffered_reader_close (z_buffered_reader_t *stream) {
if (stream->blob != NULL) {
free(stream->blob);
stream->blob = NULL;
}
}
/* ============================================================================
* PRIVATE Disk Stream plug
*/
static int __disk_stream_flush (z_stream_t *stream) {
/* TODO: use fdatasync() on linux? */
return(fsync(((z_disk_t *)stream)->fd));
}
static int __disk_stream_push (z_stream_t *stream,
const void *buffer,
unsigned int n)
{
z_disk_t *disk = (z_disk_t *)stream;
ssize_t wr;
int iwr;
if ((wr = pwrite(disk->fd, buffer, n, disk->offset)) < 0)
return(-1);
iwr = (int)(wr & 0xffffffff);
disk->offset += iwr;
return(iwr);
}
static int __disk_stream_fetch (z_stream_t *stream,
void *buffer,
unsigned int n)
{
z_disk_t *disk = (z_disk_t *)stream;
ssize_t rd;
int ird;
if ((rd = pread(disk->fd, buffer, n, disk->offset)) < 0)
return(-1);
ird = (int)(rd & 0xffffffff);
disk->offset += ird;
return(ird);
}
static int __disk_stream_seek (z_stream_t *stream, uint64_t offset) {
/* TODO: check boundary? */
((z_disk_t *)stream)->offset = offset;
return(0);
}
static uint64_t __disk_stream_tell (z_stream_t *stream) {
return(((z_disk_t *)stream)->offset);
}
static uint64_t __disk_stream_length (z_stream_t *stream) {
struct stat buf;
if (fstat(((z_disk_t *)stream)->fd, &buf) < 0)
return(0); /* TODO: FIXME: MAX_UINT64? */
return(buf.st_size);
}
static z_stream_vtable_t __disk_stream_vtable = {
.write = NULL, /* disk doesn't implement zero-copy read/write */
.read = NULL, /* disk doesn't implement zero-copy read/write */
.backup = NULL, /* disk doesn't implement zero-copy read/write */
.fetch = __disk_stream_fetch,
.push = __disk_stream_push,
.flush = __disk_stream_flush,
.seek = __disk_stream_seek,
.tell = __disk_stream_tell,
.length = __disk_stream_length,
};
/* ============================================================================
* PUBLIC Disk API
*/
int disk_stream_create (z_disk_t *disk, const char *path, int flags, int mode) {
disk->vtable = &__disk_stream_vtable;
disk->offset = 0;
if ((disk->fd = open(path, flags, mode)) < 0) {
perror("open()");
return(-1);
}
return(0);
}
int disk_stream_open (z_disk_t *disk, const char *path, int flags) {
disk->vtable = &__disk_stream_vtable;
disk->offset = 0;
if ((disk->fd = open(path, flags)) < 0) {
perror("open()");
return(-1);
}
return(0);
}
int disk_stream_from_fd (z_disk_t *disk, int fd, uint64_t offset) {
disk->vtable = &__disk_stream_vtable;
disk->offset = offset;
disk->fd = fd;
return(0);
}
void disk_stream_close (z_disk_t *disk) {
close(disk->fd);
}
/* ============================================================================
* DEMO api usage...
*/
#include <sys/time.h>
#define __USEC_PER_SEC ((uint64_t)1000000U)
uint64_t z_time_micros (void) {
struct timeval now;
gettimeofday(&now, NULL);
return(now.tv_sec * __USEC_PER_SEC + now.tv_usec);
}
float z_timer_secs (uint64_t s, uint64_t e) {
return((float)(e - s) / __USEC_PER_SEC);
}
int main (int argc, char **argv) {
#if 1
z_buffered_writer_t wbuffer;
z_buffered_reader_t rbuffer;
z_disk_t disk;
void *buffer;
int i, n;
printf("vtable %lu\n", sizeof(z_stream_vtable_t));
printf("disk %lu\n", sizeof(z_disk_t));
printf("bwriter %lu\n", sizeof(z_buffered_writer_t));
printf("breader %lu\n", sizeof(z_buffered_reader_t));
/* Write something on disk */
disk_stream_create(&disk, "test.txt", O_TRUNC | O_CREAT | O_WRONLY, 0644);
buffered_writer_open(&wbuffer, &disk, NULL, 16);
n = z_stream_write(&wbuffer, &buffer);
memset(buffer, 'a', n);
n = z_stream_write(&wbuffer, &buffer);
memset(buffer, 'b', n - 8);
z_stream_backup(&wbuffer, 8);
n = z_stream_write(&wbuffer, &buffer);
memset(buffer, 'c', n);
z_stream_push_fully(&wbuffer, "This is a long test to overflow the buffer", 42);
buffered_writer_close(&wbuffer);
disk_stream_close(&disk);
/* Read something from disk */
disk_stream_open(&disk, "test.txt", O_RDONLY);
buffered_reader_open(&rbuffer, &disk, NULL, 8);
while ((n = z_stream_read(&rbuffer, &buffer)) > 0) {
for (i = 0; i < n; ++i) putc(((const char *)buffer)[i], stdout);
putc('\n', stdout);
}
buffered_reader_close(&rbuffer);
disk_stream_close(&disk);
#elif 0
char buffer[16 * 1024];
unsigned int i;
uint64_t s, e;
off_t offset;
int fd;
memset(buffer, 1, sizeof(buffer));
if ((fd = open("test.data", O_TRUNC | O_CREAT | O_WRONLY, 0644)) < 0) {
perror("open()");
return(1);
}
s = z_time_micros();
offset = 0;
for (i = 0; i < 16 * 1024; ++i) {
pwrite(fd, buffer, sizeof(buffer), offset);
offset += sizeof(buffer);
}
e = z_time_micros();
printf("Timed %.3fsec\n", z_timer_secs(s, e));
close(fd);
#else
//char buffer[16 * 1024];
void *buffer;
z_buffered_writer_t wbuffer;
unsigned int i;
uint64_t s, e;
uint64_t offset;
z_disk_t disk;
int n;
disk_stream_create(&disk, "test.data", O_TRUNC | O_CREAT | O_WRONLY, 0644);
buffered_writer_open(&wbuffer, &disk, NULL, 16 * 1024);
s = z_time_micros();
offset = 0;
for (i = 0; i < 16 * 1024; ++i) {
n = z_stream_write(&wbuffer, &buffer);
memset(buffer, 'a', n);
//z_stream_push_fully(&wbuffer, buffer, sizeof(buffer));
offset += sizeof(buffer);
}
e = z_time_micros();
printf("Timed %.3fsec\n", z_timer_secs(s, e));
buffered_writer_close(&wbuffer);
disk_stream_close(&disk);
#endif
return(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment