コード
/**
* @file tcp_server.cpp
* @brief Implementation of the TcpServer class.
*/
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <thread>
#include <vector>
#include <queue>
#include <memory>
#include <atomic>
#include <mutex>
#include <condition_variable>
/**
* @class TcpServer
* @brief Represents a TCP server that can handle multiple client connections concurrently.
*/
class TcpServer {
public:
/**
* @brief Constructs a new TcpServer object.
* @param max_threads The maximum number of threads that can be used to handle client connections.
*/
TcpServer(int max_threads) : running_(false), max_threads_(max_threads) {
server_fd_ = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd_ < 0) {
perror("socket");
return;
}
int opt = 1;
if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt");
close(server_fd_);
return;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(8080);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(server_fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("bind");
close(server_fd_);
return;
}
if (listen(server_fd_, 3) < 0) {
perror("listen");
close(server_fd_);
return;
}
}
/**
* @brief Starts the server and begins accepting client connections.
*/
void Run() {
running_ = true;
for (int i = 0; i < max_threads_; ++i) {
workers_.emplace_back(&TcpServer::Worker, this);
}
while (running_) {
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd_, (struct sockaddr*)&client_addr, &client_addr_len);
if (client_fd < 0) {
if (running_) {
perror("accept");
}
continue;
}
{
std::unique_lock<std::mutex> lock(queue_mutex_);
client_queue_.push(client_fd);
}
queue_cond_.notify_one();
}
}
/**
* @brief Stops the server and closes all client connections.
*/
void Stop() {
running_ = false;
close(server_fd_); // acceptを解除するためにソケットをクローズ
queue_cond_.notify_all(); // すべてのスレッドを起こす
for (auto& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
/**
* @brief Destroys the TcpServer object.
*/
~TcpServer() {
Stop();
}
private:
/**
* @brief Worker function that handles client connections.
*/
void Worker() {
while (running_) {
int client_fd;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cond_.wait(lock, [this] { return !client_queue_.empty() || !running_; });
if (!running_ && client_queue_.empty()) {
return;
}
client_fd = client_queue_.front();
client_queue_.pop();
}
HandleClient(client_fd);
}
}
/**
* @brief Handles a client connection.
* @param client_fd The file descriptor of the client connection.
*/
void HandleClient(int client_fd) {
// convert client_fd to FILE*
FILE* client_file = fdopen(client_fd, "r+");
if (client_file == NULL) {
perror("fdopen");
close(client_fd);
return;
}
// Set line buffering
setvbuf(client_file, NULL, _IOLBF, 0);
// Read data from the client and write it back
char buffer[1024];
fgets(buffer, sizeof(buffer), client_file);
fputs(buffer, client_file);
// close the FILE*
fclose(client_file);
// Close the connection
close(client_fd);
}
int server_fd_;
std::atomic<bool> running_;
int max_threads_;
std::vector<std::thread> workers_;
std::queue<int> client_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cond_;
};
/**
* @brief Main function that creates and runs a TcpServer object.
*/
int main() {
TcpServer server(4); // 最大4スレッド
std::thread server_thread(&TcpServer::Run, &server);
// サーバーを10秒後に停止
std::this_thread::sleep_for(std::chrono::seconds(10));
server.Stop();
server_thread.join();
return 0;
}
ビルド方法
g++ -std=c++17 -o tcp_server tcp_server.cpp