Last active
June 23, 2025 16:09
-
-
Save ashgti/003ada30ee59aed154bac111581f97d5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Emulate 'select' behavior on Windows for Pipe handles, both anonymous and named pipes. | |
// See `SelectHelper::Select` below. | |
// clang-format off | |
#include <wtypes.h> | |
#include <errno.h> | |
#include <fcntl.h> | |
#include <io.h> | |
#include <winbase.h> | |
#include <winternl.h> | |
#include <winsock2.h> | |
#include <ws2tcpip.h> | |
// clang-format on | |
#include <cassert> | |
#include <cerrno> | |
#include <chrono> | |
#include <compare> | |
#include <cstddef> | |
#include <cstdio> | |
#include <future> | |
#include <map> | |
#include <mutex> | |
#include <optional> | |
#include "gunit/gunit.h" | |
enum { | |
FilePipeLocalInformation = 24, | |
}; | |
typedef NTSTATUS(NTAPI *_NtQueryInformationFile_fn)( | |
HANDLE FileHandle, PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, | |
ULONG Length, FILE_INFORMATION_CLASS FileInformationClass); | |
static _NtQueryInformationFile_fn pNtQueryInformationFile; | |
typedef struct _FILE_PIPE_LOCAL_INFORMATION { | |
ULONG NamedPipeType; | |
ULONG NamedPipeConfiguration; | |
ULONG MaximumInstances; | |
ULONG CurrentInstances; | |
ULONG InboundQuota; | |
ULONG ReadDataAvailable; | |
ULONG OutboundQuota; | |
ULONG WriteQuotaAvailable; | |
ULONG NamedPipeState; | |
ULONG NamedPipeEnd; | |
} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION; | |
namespace { | |
void init_winsock() { | |
static std::once_flag once_flag; | |
std::call_once(once_flag, []() { | |
auto wVersion = WINSOCK_VERSION; | |
WSADATA wsaData; | |
int err = ::WSAStartup(wVersion, &wsaData); | |
if (err == 0) { | |
if (wsaData.wVersion < wVersion) { | |
WSACleanup(); | |
FAIL() << "WSASock version is not expected."; | |
} | |
} else { | |
FAIL() << "WSASock version is not expected. " << ::WSAGetLastError(); | |
} | |
}); | |
} | |
DWORD pipe_write_available(const HANDLE &hPipe) { | |
static std::once_flag once_flag; | |
std::call_once(once_flag, []() { | |
HMODULE ntdll = LoadLibraryW(L"ntdll.dll"); | |
// Not found. | |
if (!ntdll) return; | |
// Lookup 'NtQueryInformationFile' from ntdll.dll. This should be included | |
// in Windows 2000 and newer. NOTE: Not an official API, so its parameters | |
// are may change in a future release. However, this has been stable for | |
// years at this point and is used by multiple libraries and runtimes. | |
pNtQueryInformationFile = (_NtQueryInformationFile_fn)GetProcAddress( | |
ntdll, "NtQueryInformationFile"); | |
}); | |
if (!pNtQueryInformationFile) return 1; | |
IO_STATUS_BLOCK iosb; | |
FILE_PIPE_LOCAL_INFORMATION fpli; | |
NTSTATUS status = (*pNtQueryInformationFile)( | |
hPipe, &iosb, &fpli, sizeof(fpli), | |
(FILE_INFORMATION_CLASS)FilePipeLocalInformation); | |
if (!NT_SUCCESS(status)) return 1; | |
return fpli.WriteQuotaAvailable; | |
} | |
using WaitableHandle = HANDLE; | |
class IOObject { | |
public: | |
IOObject() : IOObject(-1) {} | |
explicit IOObject(int fd) : m_fd(fd) { | |
if (fd != -1) m_handle = (HANDLE)_get_osfhandle(fd); | |
} | |
explicit IOObject(FILE *file) : IOObject(fileno(file)) { m_file = file; } | |
explicit IOObject(SOCKET socket) : m_socket(socket) {} | |
virtual ~IOObject() = default; | |
bool IsValid() const { | |
return m_fd != -1 || m_file != nullptr || m_socket != INVALID_SOCKET; | |
} | |
WaitableHandle GetWaitableHandle() const { | |
if (m_socket != INVALID_SOCKET) { | |
WSAEVENT event = WSACreateEvent(); | |
WSAEventSelect(m_socket, event, FD_ACCEPT | FD_READ | FD_WRITE); | |
return event; | |
} | |
return m_handle; | |
} | |
bool HasReadableBytes() const { | |
if (!IsValid()) return false; | |
if (m_socket != INVALID_SOCKET) { | |
WSAPOLLFD poll_fd = {0}; | |
poll_fd.fd = m_socket; | |
poll_fd.events = POLLIN; | |
return WSAPoll(&poll_fd, 1, 0) > 0; | |
} | |
DWORD available_bytes = 0; | |
return !PeekNamedPipe(m_handle, NULL, 0, NULL, &available_bytes, NULL) || | |
available_bytes > 0; | |
} | |
bool HasWritableBytes() const { | |
if (!IsValid()) return false; | |
if (m_socket != INVALID_SOCKET) { | |
// FD_WRITE is edge-triggered, not level-triggered, so it will only | |
// be signaled if the socket becomes writable after a send() fails | |
// with WSAEWOULDBLOCK. We can work around this by performing a | |
// zero-byte send(). If the socket is writable, the send() will | |
// succeed and we can immediately post a packet, and if it isn't, it | |
// will fail with WSAEWOULDBLOCK and WSAEventSelect() will report | |
// the next time it becomes available. | |
return send(m_socket, "", 0, 0) == 0; | |
} | |
if (m_handle == INVALID_HANDLE_VALUE) return false; | |
return pipe_write_available(m_handle) > 0; | |
} | |
std::strong_ordering operator<=>(const IOObject &) const = default; | |
private: | |
int m_fd = -1; | |
FILE *m_file = nullptr; | |
SOCKET m_socket = INVALID_SOCKET; | |
HANDLE m_handle = INVALID_HANDLE_VALUE; | |
}; | |
class SelectHelper { | |
public: | |
// Defaults to infinite wait for select unless you call SetDeadline() | |
SelectHelper() = default; | |
// Call SetDeadline() before calling SelectHelper::Select() to set the timeout | |
// based on the current time + the timeout. This allows multiple calls to | |
// SelectHelper::Select() without having to worry about the absolute timeout | |
// as this class manages to set the relative timeout correctly. | |
void SetDeadline(const std::chrono::microseconds &timeout) { | |
m_end_time = std::chrono::steady_clock::now() + timeout; | |
} | |
// Call the FDSet*() functions before calling SelectHelper::Select() to set | |
// the file descriptors that we will watch for when calling select. This will | |
// cause FD_SET() to be called prior to calling select using the "fd" | |
// provided. | |
void FDSetRead(IOObject io) { m_handles[io].read_set = true; } | |
void FDSetWrite(IOObject io) { m_handles[io].write_set = true; } | |
void FDSetError(IOObject io) { m_handles[io].error_set = true; } | |
// Call the FDIsSet*() functions after calling SelectHelper::Select() to | |
// check which file descriptors are ready for read/write/error. This will | |
// contain the result of FD_ISSET after calling select for a given file | |
// descriptor. | |
bool FDIsSetRead(IOObject io) const { return m_handles.at(io).read_is_set; } | |
bool FDIsSetWrite(IOObject io) const { return m_handles.at(io).write_is_set; } | |
bool FDIsSetError(IOObject io) const { return m_handles.at(io).error_is_set; } | |
// Emulate 'select' behavior on Windows. | |
int Select() { | |
if (m_handles.size() > MAXIMUM_WAIT_OBJECTS) { | |
return EINVAL; | |
} | |
HANDLE handles[MAXIMUM_WAIT_OBJECTS]; | |
DWORD count = 0; | |
for (auto &pair : m_handles) { | |
if (!pair.second.read_set && !pair.second.write_set && | |
!pair.second.error_set) | |
continue; | |
pair.second.PrepareForSelect(); | |
handles[count] = pair.first.GetWaitableHandle(); | |
if (handles[count] == INVALID_HANDLE_VALUE) return EBADF; | |
count++; | |
} | |
if (count == 0) return EINVAL; | |
int ready_count = 0; | |
do { | |
DWORD millis = INFINITE; | |
if (m_end_time) { | |
std::chrono::milliseconds remaining = | |
std::chrono::duration_cast<std::chrono::milliseconds>( | |
m_end_time.value() - std::chrono::steady_clock::now()); | |
if (remaining.count() <= 0) return ETIMEDOUT; | |
millis = remaining.count(); | |
} | |
DWORD wait_result = WaitForMultipleObjects(count, handles, FALSE, millis); | |
if (wait_result == WAIT_TIMEOUT || | |
(wait_result >= WAIT_ABANDONED_0 && | |
wait_result <= WAIT_ABANDONED_0 + count - 1)) | |
return ETIMEDOUT; | |
else if (wait_result == WAIT_FAILED) | |
return GetLastError(); | |
for (auto &pair : m_handles) { | |
// Ensure we're inspecting the right handle. | |
if ((pair.second.read_set || pair.second.write_set || | |
pair.second.error_set) && | |
WAIT_OBJECT_0 != | |
WaitForSingleObject(pair.first.GetWaitableHandle(), 0)) | |
continue; | |
if (pair.second.read_set) { | |
// Check if there is data to read, otherwise keep waiting. | |
if (pair.first.HasReadableBytes()) { | |
pair.second.read_is_set = true; | |
ready_count++; | |
} | |
} else if (pair.second.write_set) { | |
// Check if the pipe is full, otherwise keep waiting. | |
if (pair.first.HasWritableBytes()) { | |
pair.second.write_is_set = true; | |
ready_count++; | |
} | |
} else if (pair.second.error_set) { | |
pair.second.error_is_set = true; | |
ready_count++; | |
} | |
} | |
// If no fds have a status to report yet, yield to allow for IO. | |
if (!ready_count) Sleep(1); | |
} while (ready_count == 0); | |
return 0; | |
} | |
protected: | |
struct FDInfo { | |
FDInfo() | |
: read_set(false), | |
write_set(false), | |
error_set(false), | |
read_is_set(false), | |
write_is_set(false), | |
error_is_set(false) {} | |
void PrepareForSelect() { | |
read_is_set = false; | |
write_is_set = false; | |
error_is_set = false; | |
} | |
bool read_set : 1, write_set : 1, error_set : 1, read_is_set : 1, | |
write_is_set : 1, error_is_set : 1; | |
}; | |
std::map<IOObject, FDInfo> m_handles; | |
std::optional<std::chrono::steady_clock::time_point> m_end_time; | |
}; | |
} // namespace | |
enum { READ, WRITE }; | |
class PipeTest : public testing::Test { | |
protected: | |
constexpr static size_t kBufferSize = 1024; | |
int fds[2]; | |
IOObject read_fd; | |
IOObject write_fd; | |
void SetUp() override { | |
ASSERT_EQ(_pipe(fds, kBufferSize, _O_BINARY), 0); | |
read_fd = IOObject(fds[READ]); | |
write_fd = IOObject(fds[WRITE]); | |
} | |
void TearDown() override { | |
_close(fds[READ]); | |
_close(fds[WRITE]); | |
} | |
}; | |
TEST_F(PipeTest, Read) { | |
SelectHelper helper; | |
helper.FDSetRead(read_fd); | |
std::future<void> _ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
_write(fds[WRITE], "hello", 5); | |
}); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetRead(read_fd)); | |
} | |
TEST_F(PipeTest, ReadWithDeadline) { | |
SelectHelper helper; | |
helper.SetDeadline(std::chrono::milliseconds(500)); | |
helper.FDSetRead(read_fd); | |
std::future<void> _ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
_write(fds[WRITE], "hello", 5); | |
}); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetRead(read_fd)); | |
} | |
TEST_F(PipeTest, ReadTimeout) { | |
SelectHelper helper; | |
helper.SetDeadline(std::chrono::milliseconds(100)); | |
helper.FDSetRead(read_fd); | |
EXPECT_EQ(helper.Select(), ETIMEDOUT); | |
EXPECT_FALSE(helper.FDIsSetRead(read_fd)); | |
} | |
TEST_F(PipeTest, WriteEmptyBuffer) { | |
SelectHelper helper; | |
helper.SetDeadline(std::chrono::milliseconds(500)); | |
helper.FDSetWrite(write_fd); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetWrite(write_fd)); | |
} | |
TEST_F(PipeTest, WriteWithFullBuffer) { | |
std::string chunk(kBufferSize, 'a'); | |
// Fill up the buffer. | |
_write(fds[WRITE], chunk.data(), chunk.size()); | |
SelectHelper helper; | |
helper.SetDeadline(std::chrono::milliseconds(500)); | |
helper.FDSetWrite(write_fd); | |
auto _ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
char buf[kBufferSize * 2]; | |
// Read the buffer to empty the pipe. | |
_read(fds[READ], &buf, sizeof(buf)); | |
}); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetWrite(write_fd)); | |
} | |
TEST_F(PipeTest, WriteTimeout) { | |
SelectHelper helper; | |
std::string chunk(kBufferSize, 'a'); | |
// Fill up the buffer. | |
_write(fds[WRITE], chunk.data(), chunk.size()); | |
helper.SetDeadline(std::chrono::milliseconds(100)); | |
helper.FDSetWrite(write_fd); | |
EXPECT_EQ(helper.Select(), ETIMEDOUT); | |
EXPECT_FALSE(helper.FDIsSetWrite(write_fd)); | |
} | |
class SocketTest : public testing::Test { | |
protected: | |
SOCKET listener_socket = INVALID_SOCKET; | |
SOCKET server_socket = INVALID_SOCKET; | |
SOCKET client_socket = INVALID_SOCKET; | |
IOObject listener_io; | |
IOObject server_io; | |
IOObject client_io; | |
SOCKADDR_STORAGE resolve_addr; | |
int resolve_addr_len; | |
void SetUp() override { | |
init_winsock(); | |
listener_socket = socket(AF_INET6, SOCK_STREAM, 0); | |
ASSERT_NE(listener_socket, INVALID_SOCKET); | |
listener_io = IOObject(listener_socket); | |
struct sockaddr_in6 loopback = {0}; | |
loopback.sin6_family = AF_INET6; | |
loopback.sin6_port = 0; | |
loopback.sin6_addr = in6addr_loopback; | |
ASSERT_EQ(bind(listener_socket, (LPSOCKADDR)&loopback, sizeof(loopback)), | |
0); | |
ASSERT_EQ(listen(listener_socket, 5), 0); | |
resolve_addr_len = sizeof(resolve_addr); | |
ASSERT_EQ(getsockname(listener_socket, (LPSOCKADDR)&resolve_addr, | |
&resolve_addr_len), | |
0); | |
} | |
void TearDown() override { | |
if (listener_socket != INVALID_SOCKET) { | |
closesocket(listener_socket); | |
listener_socket = INVALID_SOCKET; | |
} | |
if (server_socket != INVALID_SOCKET) { | |
closesocket(server_socket); | |
server_socket = INVALID_SOCKET; | |
} | |
if (client_socket != INVALID_SOCKET) { | |
closesocket(client_socket); | |
client_socket = INVALID_SOCKET; | |
} | |
} | |
void AcceptClient() { | |
SOCKADDR_STORAGE addr; | |
socklen_t addr_len = sizeof(addr); | |
server_socket = accept(listener_socket, (LPSOCKADDR)&addr, &addr_len); | |
if (server_socket == INVALID_SOCKET) { | |
FAIL() << "accept() failed: " << WSAGetLastError(); | |
} | |
server_io = IOObject(server_socket); | |
} | |
void ClientConnect() { | |
client_socket = socket(AF_INET6, SOCK_STREAM, 0); | |
ASSERT_NE(client_socket, INVALID_SOCKET); | |
EXPECT_EQ( | |
connect(client_socket, (LPSOCKADDR)&resolve_addr, resolve_addr_len), 0); | |
client_io = IOObject(client_socket); | |
} | |
}; | |
TEST_F(SocketTest, Accept) { | |
SelectHelper helper; | |
helper.FDSetRead(listener_io); | |
helper.SetDeadline(std::chrono::milliseconds(500)); | |
const auto &_ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
ClientConnect(); | |
}); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetRead(listener_io)); | |
} | |
TEST_F(SocketTest, AcceptTimeout) { | |
SelectHelper helper; | |
helper.FDSetRead(listener_io); | |
helper.SetDeadline(std::chrono::milliseconds(100)); | |
EXPECT_EQ(helper.Select(), ETIMEDOUT); | |
EXPECT_FALSE(helper.FDIsSetRead(listener_io)); | |
} | |
TEST_F(SocketTest, Recv) { | |
std::future<void> _ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
ClientConnect(); | |
}); | |
AcceptClient(); | |
send(client_socket, "hello", 5, 0); | |
SelectHelper helper; | |
helper.FDSetRead(server_io); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetRead(server_io)); | |
} | |
TEST_F(SocketTest, RecvWithDeadline) { | |
std::future<void> _ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
ClientConnect(); | |
}); | |
AcceptClient(); | |
send(client_socket, "hello", 5, 0); | |
SelectHelper helper; | |
helper.FDSetRead(server_io); | |
helper.SetDeadline(std::chrono::milliseconds(500)); | |
EXPECT_EQ(helper.Select(), 0); | |
EXPECT_TRUE(helper.FDIsSetRead(server_io)); | |
} | |
TEST_F(SocketTest, RecvTimeout) { | |
std::future<void> _ = std::async(std::launch::async, [&]() { | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
ClientConnect(); | |
}); | |
AcceptClient(); | |
SelectHelper helper; | |
helper.FDSetRead(server_io); | |
helper.SetDeadline(std::chrono::milliseconds(100)); | |
EXPECT_EQ(helper.Select(), ETIMEDOUT); | |
EXPECT_FALSE(helper.FDIsSetRead(server_io)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment