Skip to content

Instantly share code, notes, and snippets.

@mattn
Last active August 29, 2015 14:13
Show Gist options
  • Save mattn/325e861d101012ce1de2 to your computer and use it in GitHub Desktop.
Save mattn/325e861d101012ce1de2 to your computer and use it in GitHub Desktop.
// windows: gcc -o uv_streem.exe uv_streem.c -luv -lws2_32 -lpsapi -liphlpapi
// unix: gcc -o uv_streem uv_streem.c -luv -lrt
#include <stdio.h>
#include <uv.h>
struct _task_t;
typedef void (*strm_cb)(struct _task_t *);
typedef struct _strm_stream {
char* name;
strm_cb cb;
struct _strm_stream* next;
} strm_stream;
typedef struct _task_t {
uv_loop_t* loop;
strm_stream* p;
uv_buf_t* buf;
} task_t;
static void
alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
static void
after_cb(uv_async_t *async) {
task_t* task = (task_t*) async->data;
strm_stream* p = (strm_stream*) task->p;
p->cb(task);
free(task);
}
static void
strm_emit(uv_loop_t* loop, strm_stream* p, uv_buf_t* buf) {
uv_async_t* async;
task_t* task;
if (!p) return;
task = malloc(sizeof(task_t));
task->p = p;
task->buf = buf;
task->loop = loop;
async = malloc(sizeof(uv_async_t));
uv_async_init(task->loop, async, after_cb);
async->data = task;
uv_async_send(async);
}
static void
strm_stop(uv_loop_t* loop, strm_stream* p) {
uv_async_t* async;
task_t* task = malloc(sizeof(task_t));
task->p = p->next;
task->buf = NULL;
task->loop = loop;
async = malloc(sizeof(uv_async_t));
uv_async_init(task->loop, async, after_cb);
async->data = task;
uv_async_send(async);
}
static void
stdin_read_cb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
task_t* task = (task_t*) q->data;
uv_buf_t* newbuf;
if (nread < 0) {
uv_close((uv_handle_t*) q, NULL);
strm_stop(task->loop, task->p);
return;
}
// make copy of buffer
newbuf = malloc(sizeof(uv_buf_t));
newbuf->base = malloc(nread);
memcpy(newbuf->base, buf->base, nread);
newbuf->len = nread;
free(buf->base);
strm_emit(task->loop, task->p->next, newbuf);
}
static void
stdin_read(task_t* task) {
task_t* read_task;
uv_file fd = fileno(stdin);
read_task = malloc(sizeof(task_t));
read_task->loop = task->loop;
read_task->p = task->p;
read_task->buf = task->buf;
if (uv_guess_handle(fd) == UV_NAMED_PIPE) {
static uv_pipe_t pipe;
uv_pipe_init(task->loop, &pipe, 0);
uv_pipe_open(&pipe, fd);
pipe.data = read_task;
uv_read_start((uv_stream_t*) &pipe, alloc_cb, stdin_read_cb);
} else {
static uv_tty_t tty_in;
uv_tty_init(task->loop, &tty_in, fileno(stdin), 1);
tty_in.data = read_task;
uv_read_start((uv_stream_t*) &tty_in, alloc_cb, stdin_read_cb);
}
}
static void
to_upper(task_t* task) {
uv_buf_t* buf = task->buf;
int i;
if (!buf) {
strm_stop(task->loop, task->p);
return;
}
// make upper-case
for (i = 0; i < buf->len; i++) {
buf->base[i] = toupper(buf->base[i]);
}
strm_emit(task->loop, task->p->next, buf);
}
static void
stdout_write_cb(uv_write_t* req, int status) {
free(req->data);
}
static void
stdout_write(task_t* task) {
uv_buf_t* buf = task->buf;
static uv_tty_t tty_out;
static int initialized = 0;
uv_write_t* req;
if (!buf) {
strm_stop(task->loop, task->p);
return;
}
if (!initialized) {
uv_tty_init(task->loop, &tty_out, fileno(stdout), 0);
initialized = 1;
}
req = malloc(sizeof(uv_write_t));
req->data = buf;
uv_write(req, (uv_stream_t*) &tty_out, buf, 1, stdout_write_cb);
}
static strm_stream*
strm_alloc_stream(char* name, strm_cb cb) {
strm_stream* strm = malloc(sizeof(strm_stream));
strm->name = name;
strm->cb = cb;
strm->next = NULL;
}
static void
strm_connect(strm_stream *src, strm_stream *dst) {
src->next = dst;
}
static void
strm_loop(strm_stream* top) {
uv_loop_t* loop = uv_default_loop();
strm_emit(loop, top, NULL);
uv_run(loop, UV_RUN_DEFAULT);
}
int
main(int argc, char* argv[]) {
strm_stream* strm_stdin = strm_alloc_stream("STDIN", stdin_read);
strm_stream* strm_toupper = strm_alloc_stream("TO_UPPER", to_upper);
strm_stream* strm_stdout = strm_alloc_stream("STDOUT", stdout_write);
strm_connect(strm_stdin, strm_toupper);
strm_connect(strm_toupper, strm_stdout);
strm_loop(strm_stdin);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment