Last active
December 26, 2015 01:49
-
-
Save alexeiz/7074336 to your computer and use it in GitHub Desktop.
Multiplexing IO from a pipe.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <fstream> | |
#include <thread> | |
#include <stdexcept> | |
#include <memory> | |
#include <vector> | |
#include <cstring> | |
#include <unistd.h> | |
#include <fcntl.h> | |
// #define USE_POLL | |
#ifdef USE_POLL | |
#include <poll.h> | |
#else | |
#include <sys/epoll.h> | |
#endif | |
using namespace std; | |
class die | |
{ | |
public: | |
explicit | |
die(char const * what) | |
: what_(what) | |
{} | |
char const * what() const | |
{ | |
return what_; | |
} | |
friend | |
void operator or(bool expected, die const & obj) | |
{ | |
if (!expected) | |
throw runtime_error(obj.what()); | |
} | |
private: | |
char const * what_; | |
}; | |
#ifdef USE_POLL | |
class poller | |
{ | |
public: | |
poller(int filedes) | |
{ | |
memset(&poll_fd_, 0, sizeof(poll_fd_)); | |
poll_fd_.fd = filedes; | |
poll_fd_.events = POLLIN; | |
} | |
void operator()() | |
{ | |
int ndesc = poll(&poll_fd_, 1, -1); | |
ndesc == 1 or die("poll"); | |
poll_fd_.revents & POLLIN != 0 | |
or die("poll received unexpected events"); | |
} | |
private: | |
pollfd poll_fd_; | |
}; | |
#else | |
class poller | |
{ | |
public: | |
poller(int filedes) | |
: epoll_des_{0} | |
, received_() | |
{ | |
epoll_des_ = epoll_create(1); | |
epoll_des_ != -1 or die("epoll_create"); | |
epoll_event add_event{}; | |
add_event.data.fd = filedes; | |
add_event.events = EPOLLIN | EPOLLET; | |
epoll_ctl(epoll_des_, EPOLL_CTL_ADD, filedes, &add_event) != -1 | |
or die("epoll_ctr EPOLL_CTL_ADD"); | |
// add one slot for each event | |
received_.emplace_back(); | |
} | |
void operator()() | |
{ | |
int number_fds = epoll_wait(epoll_des_, received_.data(), received_.size(), -1); | |
number_fds != -1 or die("epoll_wait"); | |
} | |
private: | |
int epoll_des_; | |
vector<epoll_event> received_; | |
}; | |
#endif | |
void unblock(int fildes) | |
{ | |
int flags = fcntl(fildes, F_GETFL, 0); | |
flags != -1 or die("fcntl F_GETFL"); | |
fcntl(fildes, F_SETFL, flags |= O_NONBLOCK) != -1 or die("fcntl F_SETFL"); | |
} | |
void read_file(char const * file, int pout) | |
{ | |
ifstream fin(file); | |
size_t const buf_size = 256; | |
char buf[buf_size]; | |
while (fin.read(buf, buf_size)) | |
write(pout, buf, buf_size); | |
if (fin.gcount()) | |
write(pout, buf, fin.gcount()); | |
close(pout); | |
} | |
int main(int argc, char * argv[]) | |
{ | |
if (argc < 2) | |
{ | |
cerr << "options: " << argv[0] << " file\n"; | |
exit(1); | |
} | |
int pipes[2]; | |
pipe(pipes) == 0 or die("pipe"); | |
int & pin = pipes[0]; | |
int & pout = pipes[1]; | |
unblock(pin); | |
thread reader(read_file, argv[1], pout); | |
reader.detach(); | |
size_t const buf_size = 256; | |
char buf[buf_size + 1]; | |
poller poll(pin); | |
for (;;) | |
{ | |
size_t bytes = read(pin, buf, buf_size); | |
if (bytes == 0) | |
break; | |
if (bytes == -1) | |
{ | |
errno == EAGAIN or die("read"); | |
poll(); | |
continue; | |
} | |
buf[bytes] = '\0'; | |
cout << buf; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment