Created
April 29, 2011 16:04
-
-
Save sustrik/948531 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/dist.cpp b/src/dist.cpp | |
| index 093da79..9201294 100644 | |
| --- a/src/dist.cpp | |
| +++ b/src/dist.cpp | |
| @@ -23,9 +23,11 @@ | |
| #include "err.hpp" | |
| #include "own.hpp" | |
| #include "msg.hpp" | |
| +#include "likely.hpp" | |
| zmq::dist_t::dist_t (own_t *sink_) : | |
| active (0), | |
| + eligible (0), | |
| more (false), | |
| sink (sink_), | |
| terminating (false) | |
| @@ -39,20 +41,24 @@ zmq::dist_t::~dist_t () | |
| void zmq::dist_t::attach (writer_t *pipe_) | |
| { | |
| - // If we are in the middle of sending a message, let's postpone plugging | |
| - // in the pipe. | |
| - if (!terminating && more) { | |
| - new_pipes.push_back (pipe_); | |
| - return; | |
| - } | |
| - | |
| pipe_->set_event_sink (this); | |
| - pipes.push_back (pipe_); | |
| - pipes.swap (active, pipes.size () - 1); | |
| - active++; | |
| + // If we are in the middle of sending a message, we'll add new pipe | |
| + // into the list of eligible pipes. Otherwise we add it to the list | |
| + // of active pipes. | |
| + if (more) { | |
| + pipes.push_back (pipe_); | |
| + pipes.swap (eligible, pipes.size () - 1); | |
| + eligible++; | |
| + } | |
| + else { | |
| + pipes.push_back (pipe_); | |
| + pipes.swap (active, pipes.size () - 1); | |
| + active++; | |
| + eligible++; | |
| + } | |
| - if (terminating) { | |
| + if (unlikely (terminating)) { | |
| sink->register_term_acks (1); | |
| pipe_->terminate (); | |
| } | |
| @@ -70,21 +76,32 @@ void zmq::dist_t::terminate () | |
| void zmq::dist_t::terminated (writer_t *pipe_) | |
| { | |
| - // Remove the pipe from the list; adjust number of active pipes | |
| - // accordingly. | |
| + // Remove the pipe from the list; adjust number of active and/or | |
| + // eligible pipes accordingly. | |
| if (pipes.index (pipe_) < active) | |
| active--; | |
| + if (pipes.index (pipe_) < eligible) | |
| + eligible--; | |
| pipes.erase (pipe_); | |
| - if (terminating) | |
| + if (unlikely (terminating)) | |
| sink->unregister_term_ack (); | |
| } | |
| void zmq::dist_t::activated (writer_t *pipe_) | |
| { | |
| - // Move the pipe to the list of active pipes. | |
| - pipes.swap (pipes.index (pipe_), active); | |
| - active++; | |
| + // If we are in the middle of sending a message, we'll add the pipe | |
| + // into the list of eligible pipes. Otherwise we add it to the list | |
| + // of active pipes. | |
| + if (more) { | |
| + pipes.swap (pipes.index (pipe_), eligible); | |
| + eligible++; | |
| + } | |
| + else { | |
| + pipes.swap (pipes.index (pipe_), active); | |
| + active++; | |
| + eligible++; | |
| + } | |
| } | |
| int zmq::dist_t::send (msg_t *msg_, int flags_) | |
| @@ -95,9 +112,9 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) | |
| // Push the message to active pipes. | |
| distribute (msg_, flags_); | |
| - // If mutlipart message is fully sent, activate new pipes. | |
| - if (more && !msg_more) | |
| - clear_new_pipes (); | |
| + // If mutlipart message is fully sent, activate all the eligible pipes. | |
| + if (!msg_more) | |
| + active = eligible; | |
| more = msg_more; | |
| @@ -141,8 +158,10 @@ bool zmq::dist_t::has_out () | |
| bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) | |
| { | |
| if (!pipe_->write (msg_)) { | |
| + pipes.swap (pipes.index (pipe_), active - 1); | |
| active--; | |
| - pipes.swap (pipes.index (pipe_), active); | |
| + pipes.swap (active, eligible - 1); | |
| + eligible--; | |
| return false; | |
| } | |
| if (!(msg_->flags () & msg_t::more)) | |
| @@ -150,15 +169,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) | |
| return true; | |
| } | |
| -void zmq::dist_t::clear_new_pipes () | |
| -{ | |
| - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); | |
| - ++it) { | |
| - (*it)->set_event_sink (this); | |
| - pipes.push_back (*it); | |
| - pipes.swap (active, pipes.size () - 1); | |
| - active++; | |
| - } | |
| - new_pipes.clear (); | |
| -} | |
| - | |
| diff --git a/src/dist.hpp b/src/dist.hpp | |
| index ea05305..fd522b9 100644 | |
| --- a/src/dist.hpp | |
| +++ b/src/dist.hpp | |
| @@ -56,24 +56,22 @@ namespace zmq | |
| // Put the message to all active pipes. | |
| void distribute (class msg_t *msg_, int flags_); | |
| - // Plug in all the delayed pipes. | |
| - void clear_new_pipes (); | |
| - | |
| // List of outbound pipes. | |
| typedef array_t <class writer_t> pipes_t; | |
| pipes_t pipes; | |
| - // List of new pipes that were not yet inserted into 'pipes' list. | |
| - // These pipes are moves to 'pipes' list once the current multipart | |
| - // message is fully sent. This way we avoid sending incomplete messages | |
| - // to peers. | |
| - typedef std::vector <class writer_t*> new_pipes_t; | |
| - new_pipes_t new_pipes; | |
| - | |
| // Number of active pipes. All the active pipes are located at the | |
| - // beginning of the pipes array. | |
| + // beginning of the pipes array. These are the pipes the messages | |
| + // can be sent to at the moment. | |
| pipes_t::size_type active; | |
| + // Number of pipes eligible for sending messages to. This includes all | |
| + // the active pipes plus all the pipes that we can in theory send | |
| + // messages to (the HWM is not yet reached), but sending a message | |
| + // to them would result in partial message being delivered, ie. message | |
| + // with initial parts missing. | |
| + pipes_t::size_type eligible; | |
| + | |
| // True if last we are in the middle of a multipart message. | |
| bool more; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment