Skip to content

Instantly share code, notes, and snippets.

@alexeiz
Last active December 26, 2015 01:49
Show Gist options
  • Save alexeiz/7074336 to your computer and use it in GitHub Desktop.
Save alexeiz/7074336 to your computer and use it in GitHub Desktop.
Multiplexing IO from a pipe.
#include <iostream>
#include <fstream>
#include <thread>
#include <stdexcept>
#include <memory>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <fcntl.h>
// #define USE_POLL
#ifdef USE_POLL
#include <poll.h>
#else
#include <sys/epoll.h>
#endif
using namespace std;
class die
{
public:
explicit
die(char const * what)
: what_(what)
{}
char const * what() const
{
return what_;
}
friend
void operator or(bool expected, die const & obj)
{
if (!expected)
throw runtime_error(obj.what());
}
private:
char const * what_;
};
#ifdef USE_POLL
class poller
{
public:
poller(int filedes)
{
memset(&poll_fd_, 0, sizeof(poll_fd_));
poll_fd_.fd = filedes;
poll_fd_.events = POLLIN;
}
void operator()()
{
int ndesc = poll(&poll_fd_, 1, -1);
ndesc == 1 or die("poll");
poll_fd_.revents & POLLIN != 0
or die("poll received unexpected events");
}
private:
pollfd poll_fd_;
};
#else
class poller
{
public:
poller(int filedes)
: epoll_des_{0}
, received_()
{
epoll_des_ = epoll_create(1);
epoll_des_ != -1 or die("epoll_create");
epoll_event add_event{};
add_event.data.fd = filedes;
add_event.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_des_, EPOLL_CTL_ADD, filedes, &add_event) != -1
or die("epoll_ctr EPOLL_CTL_ADD");
// add one slot for each event
received_.emplace_back();
}
void operator()()
{
int number_fds = epoll_wait(epoll_des_, received_.data(), received_.size(), -1);
number_fds != -1 or die("epoll_wait");
}
private:
int epoll_des_;
vector<epoll_event> received_;
};
#endif
void unblock(int fildes)
{
int flags = fcntl(fildes, F_GETFL, 0);
flags != -1 or die("fcntl F_GETFL");
fcntl(fildes, F_SETFL, flags |= O_NONBLOCK) != -1 or die("fcntl F_SETFL");
}
void read_file(char const * file, int pout)
{
ifstream fin(file);
size_t const buf_size = 256;
char buf[buf_size];
while (fin.read(buf, buf_size))
write(pout, buf, buf_size);
if (fin.gcount())
write(pout, buf, fin.gcount());
close(pout);
}
int main(int argc, char * argv[])
{
if (argc < 2)
{
cerr << "options: " << argv[0] << " file\n";
exit(1);
}
int pipes[2];
pipe(pipes) == 0 or die("pipe");
int & pin = pipes[0];
int & pout = pipes[1];
unblock(pin);
thread reader(read_file, argv[1], pout);
reader.detach();
size_t const buf_size = 256;
char buf[buf_size + 1];
poller poll(pin);
for (;;)
{
size_t bytes = read(pin, buf, buf_size);
if (bytes == 0)
break;
if (bytes == -1)
{
errno == EAGAIN or die("read");
poll();
continue;
}
buf[bytes] = '\0';
cout << buf;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment