Skip to content

Instantly share code, notes, and snippets.

@ashgti
Last active June 23, 2025 16:09
Show Gist options
  • Save ashgti/003ada30ee59aed154bac111581f97d5 to your computer and use it in GitHub Desktop.
Save ashgti/003ada30ee59aed154bac111581f97d5 to your computer and use it in GitHub Desktop.
// Emulate 'select' behavior on Windows for Pipe handles, both anonymous and named pipes.
// See `SelectHelper::Select` below.
// clang-format off
#include <wtypes.h>
#include <errno.h>
#include <fcntl.h>
#include <io.h>
#include <winbase.h>
#include <winternl.h>
#include <winsock2.h>
#include <ws2tcpip.h>
// clang-format on
#include <cassert>
#include <cerrno>
#include <chrono>
#include <compare>
#include <cstddef>
#include <cstdio>
#include <future>
#include <map>
#include <mutex>
#include <optional>
#include "gunit/gunit.h"
enum {
FilePipeLocalInformation = 24,
};
typedef NTSTATUS(NTAPI *_NtQueryInformationFile_fn)(
HANDLE FileHandle, PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation,
ULONG Length, FILE_INFORMATION_CLASS FileInformationClass);
static _NtQueryInformationFile_fn pNtQueryInformationFile;
typedef struct _FILE_PIPE_LOCAL_INFORMATION {
ULONG NamedPipeType;
ULONG NamedPipeConfiguration;
ULONG MaximumInstances;
ULONG CurrentInstances;
ULONG InboundQuota;
ULONG ReadDataAvailable;
ULONG OutboundQuota;
ULONG WriteQuotaAvailable;
ULONG NamedPipeState;
ULONG NamedPipeEnd;
} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION;
namespace {
void init_winsock() {
static std::once_flag once_flag;
std::call_once(once_flag, []() {
auto wVersion = WINSOCK_VERSION;
WSADATA wsaData;
int err = ::WSAStartup(wVersion, &wsaData);
if (err == 0) {
if (wsaData.wVersion < wVersion) {
WSACleanup();
FAIL() << "WSASock version is not expected.";
}
} else {
FAIL() << "WSASock version is not expected. " << ::WSAGetLastError();
}
});
}
DWORD pipe_write_available(const HANDLE &hPipe) {
static std::once_flag once_flag;
std::call_once(once_flag, []() {
HMODULE ntdll = LoadLibraryW(L"ntdll.dll");
// Not found.
if (!ntdll) return;
// Lookup 'NtQueryInformationFile' from ntdll.dll. This should be included
// in Windows 2000 and newer. NOTE: Not an official API, so its parameters
// are may change in a future release. However, this has been stable for
// years at this point and is used by multiple libraries and runtimes.
pNtQueryInformationFile = (_NtQueryInformationFile_fn)GetProcAddress(
ntdll, "NtQueryInformationFile");
});
if (!pNtQueryInformationFile) return 1;
IO_STATUS_BLOCK iosb;
FILE_PIPE_LOCAL_INFORMATION fpli;
NTSTATUS status = (*pNtQueryInformationFile)(
hPipe, &iosb, &fpli, sizeof(fpli),
(FILE_INFORMATION_CLASS)FilePipeLocalInformation);
if (!NT_SUCCESS(status)) return 1;
return fpli.WriteQuotaAvailable;
}
using WaitableHandle = HANDLE;
class IOObject {
public:
IOObject() : IOObject(-1) {}
explicit IOObject(int fd) : m_fd(fd) {
if (fd != -1) m_handle = (HANDLE)_get_osfhandle(fd);
}
explicit IOObject(FILE *file) : IOObject(fileno(file)) { m_file = file; }
explicit IOObject(SOCKET socket) : m_socket(socket) {}
virtual ~IOObject() = default;
bool IsValid() const {
return m_fd != -1 || m_file != nullptr || m_socket != INVALID_SOCKET;
}
WaitableHandle GetWaitableHandle() const {
if (m_socket != INVALID_SOCKET) {
WSAEVENT event = WSACreateEvent();
WSAEventSelect(m_socket, event, FD_ACCEPT | FD_READ | FD_WRITE);
return event;
}
return m_handle;
}
bool HasReadableBytes() const {
if (!IsValid()) return false;
if (m_socket != INVALID_SOCKET) {
WSAPOLLFD poll_fd = {0};
poll_fd.fd = m_socket;
poll_fd.events = POLLIN;
return WSAPoll(&poll_fd, 1, 0) > 0;
}
DWORD available_bytes = 0;
return !PeekNamedPipe(m_handle, NULL, 0, NULL, &available_bytes, NULL) ||
available_bytes > 0;
}
bool HasWritableBytes() const {
if (!IsValid()) return false;
if (m_socket != INVALID_SOCKET) {
// FD_WRITE is edge-triggered, not level-triggered, so it will only
// be signaled if the socket becomes writable after a send() fails
// with WSAEWOULDBLOCK. We can work around this by performing a
// zero-byte send(). If the socket is writable, the send() will
// succeed and we can immediately post a packet, and if it isn't, it
// will fail with WSAEWOULDBLOCK and WSAEventSelect() will report
// the next time it becomes available.
return send(m_socket, "", 0, 0) == 0;
}
if (m_handle == INVALID_HANDLE_VALUE) return false;
return pipe_write_available(m_handle) > 0;
}
std::strong_ordering operator<=>(const IOObject &) const = default;
private:
int m_fd = -1;
FILE *m_file = nullptr;
SOCKET m_socket = INVALID_SOCKET;
HANDLE m_handle = INVALID_HANDLE_VALUE;
};
class SelectHelper {
public:
// Defaults to infinite wait for select unless you call SetDeadline()
SelectHelper() = default;
// Call SetDeadline() before calling SelectHelper::Select() to set the timeout
// based on the current time + the timeout. This allows multiple calls to
// SelectHelper::Select() without having to worry about the absolute timeout
// as this class manages to set the relative timeout correctly.
void SetDeadline(const std::chrono::microseconds &timeout) {
m_end_time = std::chrono::steady_clock::now() + timeout;
}
// Call the FDSet*() functions before calling SelectHelper::Select() to set
// the file descriptors that we will watch for when calling select. This will
// cause FD_SET() to be called prior to calling select using the "fd"
// provided.
void FDSetRead(IOObject io) { m_handles[io].read_set = true; }
void FDSetWrite(IOObject io) { m_handles[io].write_set = true; }
void FDSetError(IOObject io) { m_handles[io].error_set = true; }
// Call the FDIsSet*() functions after calling SelectHelper::Select() to
// check which file descriptors are ready for read/write/error. This will
// contain the result of FD_ISSET after calling select for a given file
// descriptor.
bool FDIsSetRead(IOObject io) const { return m_handles.at(io).read_is_set; }
bool FDIsSetWrite(IOObject io) const { return m_handles.at(io).write_is_set; }
bool FDIsSetError(IOObject io) const { return m_handles.at(io).error_is_set; }
// Emulate 'select' behavior on Windows.
int Select() {
if (m_handles.size() > MAXIMUM_WAIT_OBJECTS) {
return EINVAL;
}
HANDLE handles[MAXIMUM_WAIT_OBJECTS];
DWORD count = 0;
for (auto &pair : m_handles) {
if (!pair.second.read_set && !pair.second.write_set &&
!pair.second.error_set)
continue;
pair.second.PrepareForSelect();
handles[count] = pair.first.GetWaitableHandle();
if (handles[count] == INVALID_HANDLE_VALUE) return EBADF;
count++;
}
if (count == 0) return EINVAL;
int ready_count = 0;
do {
DWORD millis = INFINITE;
if (m_end_time) {
std::chrono::milliseconds remaining =
std::chrono::duration_cast<std::chrono::milliseconds>(
m_end_time.value() - std::chrono::steady_clock::now());
if (remaining.count() <= 0) return ETIMEDOUT;
millis = remaining.count();
}
DWORD wait_result = WaitForMultipleObjects(count, handles, FALSE, millis);
if (wait_result == WAIT_TIMEOUT ||
(wait_result >= WAIT_ABANDONED_0 &&
wait_result <= WAIT_ABANDONED_0 + count - 1))
return ETIMEDOUT;
else if (wait_result == WAIT_FAILED)
return GetLastError();
for (auto &pair : m_handles) {
// Ensure we're inspecting the right handle.
if ((pair.second.read_set || pair.second.write_set ||
pair.second.error_set) &&
WAIT_OBJECT_0 !=
WaitForSingleObject(pair.first.GetWaitableHandle(), 0))
continue;
if (pair.second.read_set) {
// Check if there is data to read, otherwise keep waiting.
if (pair.first.HasReadableBytes()) {
pair.second.read_is_set = true;
ready_count++;
}
} else if (pair.second.write_set) {
// Check if the pipe is full, otherwise keep waiting.
if (pair.first.HasWritableBytes()) {
pair.second.write_is_set = true;
ready_count++;
}
} else if (pair.second.error_set) {
pair.second.error_is_set = true;
ready_count++;
}
}
// If no fds have a status to report yet, yield to allow for IO.
if (!ready_count) Sleep(1);
} while (ready_count == 0);
return 0;
}
protected:
struct FDInfo {
FDInfo()
: read_set(false),
write_set(false),
error_set(false),
read_is_set(false),
write_is_set(false),
error_is_set(false) {}
void PrepareForSelect() {
read_is_set = false;
write_is_set = false;
error_is_set = false;
}
bool read_set : 1, write_set : 1, error_set : 1, read_is_set : 1,
write_is_set : 1, error_is_set : 1;
};
std::map<IOObject, FDInfo> m_handles;
std::optional<std::chrono::steady_clock::time_point> m_end_time;
};
} // namespace
enum { READ, WRITE };
class PipeTest : public testing::Test {
protected:
constexpr static size_t kBufferSize = 1024;
int fds[2];
IOObject read_fd;
IOObject write_fd;
void SetUp() override {
ASSERT_EQ(_pipe(fds, kBufferSize, _O_BINARY), 0);
read_fd = IOObject(fds[READ]);
write_fd = IOObject(fds[WRITE]);
}
void TearDown() override {
_close(fds[READ]);
_close(fds[WRITE]);
}
};
TEST_F(PipeTest, Read) {
SelectHelper helper;
helper.FDSetRead(read_fd);
std::future<void> _ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
_write(fds[WRITE], "hello", 5);
});
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetRead(read_fd));
}
TEST_F(PipeTest, ReadWithDeadline) {
SelectHelper helper;
helper.SetDeadline(std::chrono::milliseconds(500));
helper.FDSetRead(read_fd);
std::future<void> _ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
_write(fds[WRITE], "hello", 5);
});
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetRead(read_fd));
}
TEST_F(PipeTest, ReadTimeout) {
SelectHelper helper;
helper.SetDeadline(std::chrono::milliseconds(100));
helper.FDSetRead(read_fd);
EXPECT_EQ(helper.Select(), ETIMEDOUT);
EXPECT_FALSE(helper.FDIsSetRead(read_fd));
}
TEST_F(PipeTest, WriteEmptyBuffer) {
SelectHelper helper;
helper.SetDeadline(std::chrono::milliseconds(500));
helper.FDSetWrite(write_fd);
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetWrite(write_fd));
}
TEST_F(PipeTest, WriteWithFullBuffer) {
std::string chunk(kBufferSize, 'a');
// Fill up the buffer.
_write(fds[WRITE], chunk.data(), chunk.size());
SelectHelper helper;
helper.SetDeadline(std::chrono::milliseconds(500));
helper.FDSetWrite(write_fd);
auto _ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
char buf[kBufferSize * 2];
// Read the buffer to empty the pipe.
_read(fds[READ], &buf, sizeof(buf));
});
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetWrite(write_fd));
}
TEST_F(PipeTest, WriteTimeout) {
SelectHelper helper;
std::string chunk(kBufferSize, 'a');
// Fill up the buffer.
_write(fds[WRITE], chunk.data(), chunk.size());
helper.SetDeadline(std::chrono::milliseconds(100));
helper.FDSetWrite(write_fd);
EXPECT_EQ(helper.Select(), ETIMEDOUT);
EXPECT_FALSE(helper.FDIsSetWrite(write_fd));
}
class SocketTest : public testing::Test {
protected:
SOCKET listener_socket = INVALID_SOCKET;
SOCKET server_socket = INVALID_SOCKET;
SOCKET client_socket = INVALID_SOCKET;
IOObject listener_io;
IOObject server_io;
IOObject client_io;
SOCKADDR_STORAGE resolve_addr;
int resolve_addr_len;
void SetUp() override {
init_winsock();
listener_socket = socket(AF_INET6, SOCK_STREAM, 0);
ASSERT_NE(listener_socket, INVALID_SOCKET);
listener_io = IOObject(listener_socket);
struct sockaddr_in6 loopback = {0};
loopback.sin6_family = AF_INET6;
loopback.sin6_port = 0;
loopback.sin6_addr = in6addr_loopback;
ASSERT_EQ(bind(listener_socket, (LPSOCKADDR)&loopback, sizeof(loopback)),
0);
ASSERT_EQ(listen(listener_socket, 5), 0);
resolve_addr_len = sizeof(resolve_addr);
ASSERT_EQ(getsockname(listener_socket, (LPSOCKADDR)&resolve_addr,
&resolve_addr_len),
0);
}
void TearDown() override {
if (listener_socket != INVALID_SOCKET) {
closesocket(listener_socket);
listener_socket = INVALID_SOCKET;
}
if (server_socket != INVALID_SOCKET) {
closesocket(server_socket);
server_socket = INVALID_SOCKET;
}
if (client_socket != INVALID_SOCKET) {
closesocket(client_socket);
client_socket = INVALID_SOCKET;
}
}
void AcceptClient() {
SOCKADDR_STORAGE addr;
socklen_t addr_len = sizeof(addr);
server_socket = accept(listener_socket, (LPSOCKADDR)&addr, &addr_len);
if (server_socket == INVALID_SOCKET) {
FAIL() << "accept() failed: " << WSAGetLastError();
}
server_io = IOObject(server_socket);
}
void ClientConnect() {
client_socket = socket(AF_INET6, SOCK_STREAM, 0);
ASSERT_NE(client_socket, INVALID_SOCKET);
EXPECT_EQ(
connect(client_socket, (LPSOCKADDR)&resolve_addr, resolve_addr_len), 0);
client_io = IOObject(client_socket);
}
};
TEST_F(SocketTest, Accept) {
SelectHelper helper;
helper.FDSetRead(listener_io);
helper.SetDeadline(std::chrono::milliseconds(500));
const auto &_ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ClientConnect();
});
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetRead(listener_io));
}
TEST_F(SocketTest, AcceptTimeout) {
SelectHelper helper;
helper.FDSetRead(listener_io);
helper.SetDeadline(std::chrono::milliseconds(100));
EXPECT_EQ(helper.Select(), ETIMEDOUT);
EXPECT_FALSE(helper.FDIsSetRead(listener_io));
}
TEST_F(SocketTest, Recv) {
std::future<void> _ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ClientConnect();
});
AcceptClient();
send(client_socket, "hello", 5, 0);
SelectHelper helper;
helper.FDSetRead(server_io);
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetRead(server_io));
}
TEST_F(SocketTest, RecvWithDeadline) {
std::future<void> _ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ClientConnect();
});
AcceptClient();
send(client_socket, "hello", 5, 0);
SelectHelper helper;
helper.FDSetRead(server_io);
helper.SetDeadline(std::chrono::milliseconds(500));
EXPECT_EQ(helper.Select(), 0);
EXPECT_TRUE(helper.FDIsSetRead(server_io));
}
TEST_F(SocketTest, RecvTimeout) {
std::future<void> _ = std::async(std::launch::async, [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ClientConnect();
});
AcceptClient();
SelectHelper helper;
helper.FDSetRead(server_io);
helper.SetDeadline(std::chrono::milliseconds(100));
EXPECT_EQ(helper.Select(), ETIMEDOUT);
EXPECT_FALSE(helper.FDIsSetRead(server_io));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment