Created
July 19, 2023 16:25
-
-
Save ClosetGeek-Git/22fcc6572bbb6e7ecbea02b311404d1c to your computer and use it in GitHub Desktop.
Swoole reactor implementation using zmq_poll from libzmq for handling native ZMQSockets objects created via php-zmq along with Swoole network::Socket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "zmq.h" | |
#include "swoole.h" | |
#include "php_swoole_cxx.h" | |
#include "swoole_socket.h" | |
#include "swoole_reactor.h" | |
namespace swoole { | |
using network::Socket; | |
typedef struct _php_zmq_context { | |
void *z_ctx; | |
int io_threads; | |
zend_bool is_persistent; | |
zend_bool use_shared_ctx; | |
zend_long socket_count; | |
int pid; | |
} php_zmq_context; | |
typedef struct _php_zmq_socket { | |
void *z_socket; | |
int socket_type; | |
php_zmq_context *ctx; | |
HashTable connect; | |
HashTable bind; | |
zend_bool is_persistent; | |
int pid; | |
} php_zmq_socket; | |
typedef struct _php_zmq_socket_object { | |
php_zmq_socket *socket; | |
char *persistent_id; | |
zval context_obj; | |
zend_object zo; | |
} php_zmq_socket_object; | |
php_zmq_socket_object *php_zmq_socket_fetch_object(zend_object *obj) { | |
return (php_zmq_socket_object *)((char *)obj - XtOffsetOf(php_zmq_socket_object, zo)); | |
} | |
struct EventObject { | |
zval zsocket; | |
zend_fcall_info_cache fci_cache_read; | |
zend_fcall_info_cache fci_cache_write; | |
}; | |
class ReactorZMQpoll : public ReactorImpl { | |
private: | |
struct zmq_pollitem_t *events_ = nullptr; | |
Socket **fds_; | |
uint32_t max_fd_num; | |
bool exists(int fd); | |
public: | |
ReactorZMQpoll(Reactor *_reactor, int max_events); | |
~ReactorZMQpoll(); | |
bool ready() override; | |
int add(Socket *socket, int events) override; | |
int set(Socket *socket, int events) override; | |
int del(Socket *socket) override; | |
int wait(struct timeval *) override; | |
}; | |
ReactorImpl *make_reactor_zmqpoll(Reactor *_reactor, int max_events) { | |
return new ReactorZMQpoll(_reactor, max_events); | |
} | |
ReactorZMQpoll::ReactorZMQpoll(Reactor *_reactor, int max_events) : ReactorImpl(_reactor) { | |
if (!ready()) { | |
swoole_sys_warning("zmq_poll failed"); | |
return; | |
} | |
fds_ = new Socket *[max_events]; | |
events_ = new struct zmq_pollitem_t[max_events]; | |
max_fd_num = max_events; | |
reactor_->max_event_num = max_events; | |
} | |
bool ReactorZMQpoll::ready() { | |
return true; | |
} | |
ReactorZMQpoll::~ReactorZMQpoll() { | |
delete[] fds_; | |
delete[] events_; | |
} | |
int ReactorZMQpoll::add(Socket *socket, int events) { | |
int fd = socket->fd; | |
if (exists(fd)) { | |
swoole_warning("fd#%d is already exists", fd); | |
return SW_ERR; | |
} | |
int cur = reactor_->get_event_num(); | |
if (reactor_->get_event_num() == max_fd_num) { | |
swoole_warning("too many connection, more than %d", max_fd_num); | |
return SW_ERR; | |
} | |
reactor_->_add(socket, events); | |
swoole_trace("fd=%d, events=%d", fd, events); | |
if(socket->fd_type != SW_FD_ZMQ) | |
{ | |
events_[cur].socket = NULL; | |
events_[cur].fd = fd; | |
}else | |
{ | |
EventObject* tmp = (EventObject*)socket->object; | |
php_zmq_socket_object *intern; | |
intern = php_zmq_socket_fetch_object(Z_OBJ(tmp->zsocket)); | |
events_[cur].socket = intern->socket->z_socket; | |
events_[cur].fd = 0; | |
} | |
fds_[cur] = socket; | |
events_[cur].events = 0; | |
if (Reactor::isset_read_event(events)) { | |
events_[cur].events |= ZMQ_POLLIN; | |
} | |
if (Reactor::isset_write_event(events)) { | |
events_[cur].events |= ZMQ_POLLOUT; | |
} | |
if (Reactor::isset_error_event(events)) { | |
events_[cur].events |= ZMQ_POLLERR; | |
} | |
events_[cur].revents = 0; | |
return SW_OK; | |
} | |
int ReactorZMQpoll::del(Socket *socket) { | |
if (socket->removed) { | |
swoole_error_log(SW_LOG_WARNING, SW_ERROR_EVENT_SOCKET_REMOVED, "failed to delete event[%d], it has already been removed", socket->fd); | |
return SW_ERR; | |
} | |
if(socket->fd_type != SW_FD_ZMQ) | |
{ | |
for(uint32_t i = 0; i < reactor_->get_event_num(); i++) | |
{ | |
if(events_[i].fd == socket->fd) | |
{ | |
for(; i < reactor_->get_event_num(); i++) | |
{ | |
if(i == reactor_->get_event_num()) | |
{ | |
fds_[i] = nullptr; | |
events_[i].fd = 0; | |
events_[i].events = 0; | |
events_[i].socket = nullptr; | |
events_[i].revents = 0; | |
}else | |
{ | |
fds_[i] = fds_[i + 1]; | |
events_[i] = events_[i + 1]; | |
} | |
} | |
reactor_->_del(socket); | |
return SW_OK; | |
} | |
} | |
} else | |
{ | |
EventObject* tmp = (EventObject*)socket->object; | |
php_zmq_socket_object *intern; | |
intern = php_zmq_socket_fetch_object(Z_OBJ(tmp->zsocket)); | |
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) | |
{ | |
if(events_[i].socket == intern->socket->z_socket) | |
{ | |
for(; i < reactor_->get_event_num(); i++) | |
{ | |
if(i == reactor_->get_event_num()) | |
{ | |
fds_[i] = nullptr; | |
events_[i].fd = 0; | |
events_[i].events = 0; | |
events_[i].socket = nullptr; | |
events_[i].revents = 0; | |
}else | |
{ | |
fds_[i] = fds_[i + 1]; | |
events_[i] = events_[i + 1]; | |
} | |
} | |
reactor_->_del(socket); | |
return SW_OK; | |
} | |
} | |
} | |
return SW_ERR; | |
} | |
int ReactorZMQpoll::set(Socket *socket, int events) { | |
uint32_t i; | |
swoole_trace("fd=%d, events=%d", socket->fd, events); | |
if(socket->fd_type != SW_FD_ZMQ) | |
{ | |
for(i = 0; i < reactor_->get_event_num(); i++) | |
{ | |
if(events_[i].fd == socket->fd) | |
{ | |
events_[i].events = 0; | |
if (Reactor::isset_read_event(events)) | |
{ | |
events_[i].events |= ZMQ_POLLIN; | |
} | |
if (Reactor::isset_write_event(events)) | |
{ | |
events_[i].events |= ZMQ_POLLOUT; | |
} | |
reactor_->_set(socket, events); | |
return SW_OK; | |
} | |
} | |
}else | |
{ | |
EventObject* tmp = (EventObject*)socket->object; | |
php_zmq_socket_object *intern; | |
intern = php_zmq_socket_fetch_object(Z_OBJ(tmp->zsocket)); | |
for(i = 0; i < reactor_->get_event_num(); i++) | |
{ | |
if(events_[i].socket == intern->socket->z_socket) | |
{ | |
events_[i].events = 0; | |
if(Reactor::isset_read_event(events)) | |
{ | |
events_[i].events |= ZMQ_POLLIN; | |
} | |
if(Reactor::isset_write_event(events)) | |
{ | |
events_[i].events |= ZMQ_POLLOUT; | |
} | |
reactor_->_set(socket, events); | |
return SW_OK; | |
} | |
} | |
} | |
return SW_ERR; | |
} | |
int ReactorZMQpoll::wait(struct timeval *timeo) { | |
Event event; | |
ReactorHandler handler; | |
int ret; | |
if (reactor_->timeout_msec == 0) { | |
if (timeo == nullptr) { | |
reactor_->timeout_msec = -1; | |
} else { | |
reactor_->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000; | |
} | |
} | |
reactor_->before_wait(); | |
while (reactor_->running) { | |
if (reactor_->onBegin != nullptr) { | |
reactor_->onBegin(reactor_); | |
} | |
ret = zmq_poll(events_, reactor_->get_event_num(), reactor_->get_timeout_msec()); | |
if (ret < 0) { | |
if (!reactor_->catch_error()) { | |
swoole_sys_warning("zmq_poll error"); | |
break; | |
} else { | |
goto _continue; | |
} | |
} else if (ret == 0) { | |
reactor_->execute_end_callbacks(true); | |
SW_REACTOR_CONTINUE; | |
} else { | |
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) { | |
event.socket = fds_[i]; | |
event.fd = events_[i].fd; | |
event.reactor_id = reactor_->id; | |
event.type = event.socket->fd_type; | |
if (events_[i].revents & ZMQ_POLLERR) { | |
event.socket->event_hup = 1; | |
} | |
swoole_trace("Event: fd=%d|reactor_id=%d|type=%d", event.fd, reactor_->id, event.type); | |
if ((events_[i].revents & ZMQ_POLLIN) && !event.socket->removed) { | |
handler = reactor_->get_handler(SW_EVENT_READ, event.type); | |
ret = handler(reactor_, &event); | |
if (ret < 0) { | |
swoole_sys_warning("zmq_poll[POLLIN] handler failed. fd=%d", event.fd); | |
} | |
} | |
if ((events_[i].revents & ZMQ_POLLOUT) && !event.socket->removed) { | |
handler = reactor_->get_handler(SW_EVENT_WRITE, event.type); | |
ret = handler(reactor_, &event); | |
if (ret < 0) { | |
swoole_sys_warning("zmq_poll[POLLOUT] handler failed. fd=%d", event.fd); | |
} | |
} | |
if ((events_[i].revents & ZMQ_POLLERR) && !event.socket->removed) { | |
if ((events_[i].revents & ZMQ_POLLIN) || (events_[i].revents & ZMQ_POLLOUT)) { | |
continue; | |
} | |
handler = reactor_->get_error_handler(event.type); | |
ret = handler(reactor_, &event); | |
if (ret < 0) { | |
swoole_sys_warning("zmq_poll[POLLERR] handler failed. fd=%d", event.fd); | |
} | |
} | |
if (!event.socket->removed && (event.socket->events & SW_EVENT_ONCE)) { | |
del(event.socket); | |
} | |
events_[i].revents = 0; | |
} | |
} | |
_continue: | |
reactor_->execute_end_callbacks(false); | |
SW_REACTOR_CONTINUE; | |
} | |
return SW_OK; | |
} | |
bool ReactorZMQpoll::exists(int fd) { | |
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) { | |
if (events_[i].fd == fd) { | |
return true; | |
} | |
} | |
return false; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment