-
-
Save kachsheev/711c45f1289f84b323035dff1d306b9e to your computer and use it in GitHub Desktop.
Example of OVERLAPPED IO using WinAPI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <atomic> | |
#include <cassert> | |
#include <iostream> | |
#include <list> | |
#include <memory> | |
#include <mutex> | |
#include <optional> | |
#include <thread> | |
#include <vector> | |
#include "windows.h" | |
enum class Operation { | |
Read, Write | |
}; | |
struct Task { | |
Operation operation; | |
HANDLE file; | |
OVERLAPPED overlapped{}; | |
std::atomic<bool> is_ready = false; | |
// Either to read or write | |
std::vector<char> buffer; | |
Task(Operation operation, | |
HANDLE file) : operation(operation), file(file), buffer(1024, 0) | |
{} | |
}; | |
struct WorkerContext { | |
bool continue_work() { | |
return continue_work_fl.load(); | |
} | |
void stop_work() { | |
continue_work_fl.store(false); | |
} | |
void add_task(std::shared_ptr<Task> task) { | |
std::lock_guard<std::mutex> lock(tasks_mutex); | |
tasks.push_back(std::move(task)); | |
} | |
std::optional<std::shared_ptr<Task>> get_task() { | |
if (!tasks_mutex.try_lock()) { | |
return std::nullopt; | |
} | |
std::optional<std::shared_ptr<Task>> result = std::nullopt; | |
if (!tasks.empty()) { | |
result = std::move(tasks.back()); | |
tasks.pop_back(); | |
} | |
tasks_mutex.unlock(); | |
return result; | |
} | |
void lock_main_thread() { | |
std::unique_lock<std::mutex> lock(mt_mutex); | |
while (!mt_count) // Handle spurious wake-ups. | |
mt_condition.wait(lock); | |
--mt_count; | |
} | |
void notify_main_thread() { | |
std::unique_lock<std::mutex> lock(mt_mutex); | |
++mt_count; | |
mt_condition.notify_one(); | |
} | |
std::vector<std::shared_ptr<Task>> to_remove; | |
private: | |
std::atomic<bool> continue_work_fl = true; | |
std::mutex tasks_mutex; | |
std::vector<std::shared_ptr<Task>> tasks; | |
// Where is semaphores :( | |
std::mutex mt_mutex; | |
std::condition_variable mt_condition; | |
size_t mt_count; | |
}; | |
struct RoutineCallbackData { | |
std::shared_ptr<Task> task; | |
WorkerContext& context; | |
}; | |
void finish_routine(DWORD dwErrorCode, | |
DWORD dwNumberOfBytesTransfered, | |
LPOVERLAPPED lpOverlapped) { | |
std::cout << "Some routine finished"; | |
RoutineCallbackData& callback_data = *static_cast<RoutineCallbackData*>(lpOverlapped->hEvent); | |
assert(lpOverlapped == &callback_data.task->overlapped); | |
callback_data.task->is_ready = true; | |
callback_data.context.to_remove.push_back(callback_data.task); | |
callback_data.context.notify_main_thread(); | |
} | |
void thread_worker(WorkerContext& context) { | |
std::vector<HANDLE> handles; | |
std::list<RoutineCallbackData> callback_datas; | |
while (context.continue_work()) { | |
while (auto new_task = context.get_task()) { | |
Task& task = **new_task; | |
switch (task.operation) { | |
case Operation::Read: | |
{ | |
task.is_ready = false; | |
task.overlapped.Offset = task.overlapped.OffsetHigh = 0; | |
callback_datas.push_back({ *new_task, context }); | |
task.overlapped.hEvent = &callback_datas.back(); | |
ReadFileEx(task.file, task.buffer.data(), task.buffer.size(), &task.overlapped, finish_routine); | |
handles.push_back(task.file); | |
break; | |
} | |
case Operation::Write: | |
{ | |
// TODO | |
break; | |
} | |
} | |
} | |
WaitForMultipleObjectsEx(handles.size(), handles.data(), false, 1, true); | |
for (auto task : context.to_remove) { | |
auto to_erase = std::remove(handles.begin(), handles.end(), task->file); | |
if (handles.end() != to_erase) { | |
handles.erase(to_erase); | |
} | |
callback_datas.remove_if([&task](RoutineCallbackData& d) {return d.task == task; }); | |
} | |
} | |
} | |
int main() { | |
WorkerContext context; | |
std::thread worker(thread_worker, std::ref(context)); | |
HANDLE file1 = CreateFileW( | |
L"test1.txt", | |
GENERIC_READ, | |
FILE_SHARE_READ, | |
nullptr, | |
OPEN_EXISTING, | |
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, | |
nullptr | |
); | |
std::shared_ptr<Task> task1 = std::make_shared<Task>(Operation::Read, file1); | |
context.add_task(task1); | |
HANDLE file2 = CreateFileW( | |
L"test2.txt", | |
GENERIC_READ, | |
FILE_SHARE_READ, | |
nullptr, | |
OPEN_EXISTING, | |
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, | |
nullptr | |
); | |
std::shared_ptr<Task> task2 = std::make_shared<Task>(Operation::Read, file2); | |
context.add_task(task2); | |
bool is_f1_closed = false; | |
bool is_f2_closed = false; | |
while (!is_f1_closed || !is_f2_closed) { | |
std::cout << "Waiting in main thread" << std::endl; | |
context.lock_main_thread(); | |
if (task1->is_ready.load() && !is_f1_closed) { | |
std::cout << "Task 1 is ready: " << std::endl; | |
std::cout << task1->buffer.data() << std::endl; | |
CloseHandle(file1); | |
is_f1_closed = true; | |
} | |
if (task2->is_ready.load() && !is_f2_closed) { | |
std::cout << "Task 2 is ready: " << std::endl; | |
std::cout << task2->buffer.data() << std::endl; | |
CloseHandle(file2); | |
is_f2_closed = true; | |
} | |
} | |
context.stop_work(); | |
worker.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment