This gist holds code and listings for zeromq/libzmq#191
.: 2-1 3-0 README.md
./2-1: crashes.txt pub.c sub.c
./3-0: crashes.txt pub2.c pub.c sub2.c sub.c
| *.o | |
| core | |
| pub | |
| sub | |
| pub2 | |
| sub2 | |
| From dfb510b844f4a19f9405ae51846bf2dc8221c84d Mon Sep 17 00:00:00 2001 | |
| From: Martin Sustrik <[email protected]> | |
| Date: Tue, 26 Apr 2011 08:25:46 +0200 | |
| Subject: [PATCH] Message atomicity problem in PUB socket fixed. | |
| Reaching the HWM caused breaking message atomicity when the | |
| flow was reestablished - initial parts of multipart messages | |
| may have been lost. | |
| Signed-off-by: Martin Sustrik <[email protected]> | |
| --- | |
| src/dist.cpp | 69 +++++++++++++++++++++++++++++++-------------------------- | |
| src/dist.hpp | 17 +++++++------ | |
| 2 files changed, 46 insertions(+), 40 deletions(-) | |
| diff --git a/src/dist.cpp b/src/dist.cpp | |
| index 093da79..be1d5af 100644 | |
| --- a/src/dist.cpp | |
| +++ b/src/dist.cpp | |
| @@ -23,9 +23,11 @@ | |
| #include "err.hpp" | |
| #include "own.hpp" | |
| #include "msg.hpp" | |
| +#include "likely.hpp" | |
| zmq::dist_t::dist_t (own_t *sink_) : | |
| active (0), | |
| + eligible (0), | |
| more (false), | |
| sink (sink_), | |
| terminating (false) | |
| @@ -39,20 +41,24 @@ zmq::dist_t::~dist_t () | |
| void zmq::dist_t::attach (writer_t *pipe_) | |
| { | |
| - // If we are in the middle of sending a message, let's postpone plugging | |
| - // in the pipe. | |
| - if (!terminating && more) { | |
| - new_pipes.push_back (pipe_); | |
| - return; | |
| - } | |
| - | |
| pipe_->set_event_sink (this); | |
| - pipes.push_back (pipe_); | |
| - pipes.swap (active, pipes.size () - 1); | |
| - active++; | |
| + // If we are in the middle of sending a message, we'll add new pipe | |
| + // into the list of eligible pipes. Otherwise we add it to the list | |
| + // of active pipes. | |
| + if (more) { | |
| + pipes.push_back (pipe_); | |
| + pipes.swap (eligible, pipes.size () - 1); | |
| + eligible++; | |
| + } | |
| + else { | |
| + pipes.push_back (pipe_); | |
| + pipes.swap (active, pipes.size () - 1); | |
| + active++; | |
| + eligible++; | |
| + } | |
| - if (terminating) { | |
| + if (unlikely (terminating)) { | |
| sink->register_term_acks (1); | |
| pipe_->terminate (); | |
| } | |
| @@ -70,21 +76,32 @@ void zmq::dist_t::terminate () | |
| void zmq::dist_t::terminated (writer_t *pipe_) | |
| { | |
| - // Remove the pipe from the list; adjust number of active pipes | |
| - // accordingly. | |
| + // Remove the pipe from the list; adjust number of active and/or | |
| + // eligible pipes accordingly. | |
| if (pipes.index (pipe_) < active) | |
| active--; | |
| + if (pipes.index (pipe_) < eligible) | |
| + eligible--; | |
| pipes.erase (pipe_); | |
| - if (terminating) | |
| + if (unlikely (terminating)) | |
| sink->unregister_term_ack (); | |
| } | |
| void zmq::dist_t::activated (writer_t *pipe_) | |
| { | |
| - // Move the pipe to the list of active pipes. | |
| - pipes.swap (pipes.index (pipe_), active); | |
| - active++; | |
| + // If we are in the middle of sending a message, we'll add new pipe | |
| + // into the list of eligible pipes. Otherwise we add it to the list | |
| + // of active pipes. | |
| + if (more) { | |
| + pipes.swap (pipes.index (pipe_), eligible); | |
| + eligible++; | |
| + } | |
| + else { | |
| + pipes.swap (pipes.index (pipe_), active); | |
| + active++; | |
| + eligible++; | |
| + } | |
| } | |
| int zmq::dist_t::send (msg_t *msg_, int flags_) | |
| @@ -95,9 +112,9 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) | |
| // Push the message to active pipes. | |
| distribute (msg_, flags_); | |
| - // If mutlipart message is fully sent, activate new pipes. | |
| - if (more && !msg_more) | |
| - clear_new_pipes (); | |
| + // If mutlipart message is fully sent, activate all the eligible pipes. | |
| + if (!msg_more) | |
| + active = eligible; | |
| more = msg_more; | |
| @@ -150,15 +167,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) | |
| return true; | |
| } | |
| -void zmq::dist_t::clear_new_pipes () | |
| -{ | |
| - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); | |
| - ++it) { | |
| - (*it)->set_event_sink (this); | |
| - pipes.push_back (*it); | |
| - pipes.swap (active, pipes.size () - 1); | |
| - active++; | |
| - } | |
| - new_pipes.clear (); | |
| -} | |
| - | |
| diff --git a/src/dist.hpp b/src/dist.hpp | |
| index ea05305..f510bd6 100644 | |
| --- a/src/dist.hpp | |
| +++ b/src/dist.hpp | |
| @@ -63,17 +63,18 @@ namespace zmq | |
| typedef array_t <class writer_t> pipes_t; | |
| pipes_t pipes; | |
| - // List of new pipes that were not yet inserted into 'pipes' list. | |
| - // These pipes are moves to 'pipes' list once the current multipart | |
| - // message is fully sent. This way we avoid sending incomplete messages | |
| - // to peers. | |
| - typedef std::vector <class writer_t*> new_pipes_t; | |
| - new_pipes_t new_pipes; | |
| - | |
| // Number of active pipes. All the active pipes are located at the | |
| - // beginning of the pipes array. | |
| + // beginning of the pipes array. These are the pipes the messages | |
| + // can be sent to at the moment. | |
| pipes_t::size_type active; | |
| + // Number of pipes eligible for sending messages to. This includes all | |
| + // the active pipes plus all the pipes that we can in theory send | |
| + // messages to (the HWM is not yet reached), but sending a message | |
| + // to them would result in partial message being delivered, ie. message | |
| + // with initial parts missing. | |
| + pipes_t::size_type eligible; | |
| + | |
| // True if last we are in the middle of a multipart message. | |
| bool more; | |
| -- | |
| 1.7.0.4 | |
This gist holds code and listings for zeromq/libzmq#191
.: 2-1 3-0 README.md
./2-1: crashes.txt pub.c sub.c
./3-0: crashes.txt pub2.c pub.c sub2.c sub.c
| Publisher | |
| ========= | |
| Test scenario: start sub, then start pub. | |
| #0 0x00007fa7eb5be0c0 in zmq::array_item_t::set_array_index (this=0xec475abd67aa89d8, index_=0) at array.hpp:50 | |
| 50 array_index = index_; | |
| (gdb) bt | |
| #0 0x00007fa7eb5be0c0 in zmq::array_item_t::set_array_index (this=0xec475abd67aa89d8, index_=0) at array.hpp:50 | |
| #1 0x00007fa7eb5c4bbf in zmq::array_t<zmq::writer_t>::swap (this=0x1525a58, index1_=0, index2_=1) at array.hpp:122 | |
| #2 0x00007fa7eb5c4447 in zmq::dist_t::activated (this=0x1525a50, pipe_=0x1526200) at dist.cpp:103 | |
| #3 0x00007fa7eb5d39d7 in zmq::writer_t::process_activate_writer (this=0x1526200, msgs_read_=0) at pipe.cpp:347 | |
| #4 0x00007fa7eb5cd6a3 in zmq::object_t::process_command (this=0x1526200, cmd_=...) at object.cpp:67 | |
| #5 0x00007fa7eb5e0d1b in zmq::socket_base_t::process_commands (this=0x15258f0, block_=false, throttle_=true) at socket_base.cpp:691 | |
| #6 0x00007fa7eb5e03bb in zmq::socket_base_t::send (this=0x15258f0, msg_=0x7fff638e1ad0, flags_=2) at socket_base.cpp:474 | |
| #7 0x00007fa7eb5ec110 in zmq_send (s_=0x15258f0, msg_=0x7fff638e1ad0, flags_=2) at zmq.cpp:375 | |
| #8 0x0000000000400957 in main () at pub.c:18 | |
| Publisher | |
| ========= | |
| Test scenario: start sub, then start pub. | |
| Assertion failed: size >= 2 (zmq_init.cpp:120) | |
| #0 0x00007f772651eba5 in raise (sig=<value optimized out>) at ../nptl/sysdeps/unix/sysv/linux/raise.c:64 | |
| #1 0x00007f77265226b0 in abort () at abort.c:92 | |
| #2 0x00007f77271b1cfc in zmq::zmq_init_t::write (this=0x696c70, msg_=0x696e10) at zmq_init.cpp:120 | |
| #3 0x00007f7727186790 in zmq::decoder_t::message_ready (this=0x696dc8) at decoder.cpp:139 | |
| #4 0x00007f77271b06c5 in zmq::decoder_base_t<zmq::decoder_t>::process_buffer (this=0x696dc8, data_=0x6986a0 "\001~i", size_=2) at decoder.hpp:122 | |
| #5 0x00007f77271b0003 in zmq::zmq_engine_t::in_event (this=0x696d90) at zmq_engine.cpp:127 | |
| #6 0x00007f77271afe25 in zmq::zmq_engine_t::plug (this=0x696d90, io_thread_=0x6963b0, inout_=0x696d28) at zmq_engine.cpp:78 | |
| #7 0x00007f77271b1f91 in zmq::zmq_init_t::process_plug (this=0x696c70) at zmq_init.cpp:178 | |
| #8 0x00007f7727190a7f in zmq::object_t::process_command (this=0x696c70, cmd_=...) at object.cpp:75 | |
| #9 0x00007f772718d385 in zmq::io_thread_t::in_event (this=0x6963b0) at io_thread.cpp:81 | |
| #10 0x00007f7727189c3f in zmq::epoll_t::loop (this=0x6963f0) at epoll.cpp:161 | |
| #11 0x00007f7727189d16 in zmq::epoll_t::worker_routine (arg_=0x6963f0) at epoll.cpp:174 | |
| #12 0x00007f77271a6b9f in thread_routine (arg_=0x696460) at thread.cpp:73 | |
| #13 0x00007f7726af8971 in start_thread (arg=<value optimized out>) at pthread_create.c:304 | |
| #14 0x00007f77265d192d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:112 | |
| #15 0x0000000000000000 in ?? () | |
| Notes: did not reproduce after rebuilding libzmq with --enable-debug. | |
| Publisher | |
| ========= | |
| Test scenario: start pub, then start sub. | |
| Program terminated with signal 11, Segmentation fault. | |
| #0 0x00007f1313b02520 in zmq::array_item_t::set_array_index (this=0x6f6f6f6f6f6f6f87, index_=0) at array.hpp:50 | |
| 50 array_index = index_; | |
| (gdb) bt | |
| #0 0x00007f1313b02520 in zmq::array_item_t::set_array_index (this=0x6f6f6f6f6f6f6f87, index_=0) at array.hpp:50 | |
| #1 0x00007f1313b0970b in zmq::array_t<zmq::writer_t>::swap (this=0xd99a50, index1_=0, index2_=1) at array.hpp:122 | |
| #2 0x00007f1313b09198 in zmq::dist_t::activated (this=0xd99a48, pipe_=0xd9a220) at dist.cpp:97 | |
| #3 0x00007f1313b187db in zmq::writer_t::process_activate_writer (this=0xd9a220, msgs_read_=0) at pipe.cpp:262 | |
| #4 0x00007f1313b12a47 in zmq::object_t::process_command (this=0xd9a220, cmd_=...) at object.cpp:67 | |
| #5 0x00007f1313b25c69 in zmq::socket_base_t::process_commands (this=0xd998f0, block_=false, throttle_=true) at socket_base.cpp:700 | |
| #6 0x00007f1313b252fd in zmq::socket_base_t::send (this=0xd998f0, msg_=0x7fff68119d20, flags_=0) at socket_base.cpp:481 | |
| #7 0x00007f1313b2fe43 in zmq_sendmsg (s_=0xd998f0, msg_=0x7fff68119d20, flags_=0) at zmq.cpp:266 | |
| #8 0x00007f1313b2fb53 in zmq_send (s_=0xd998f0, buf_=0x7fff68119d70, len_=1000, flags_=0) at zmq.cpp:219 | |
| #9 0x00000000004008d8 in main () at pub.c:25 | |
| Notes: this and following error happen arbitrarily, with both publishers. | |
| Publisher | |
| ========= | |
| Test scenario: start pub2, then start sub2. | |
| Program terminated with signal 11, Segmentation fault. | |
| #0 0x00007f262dbd2468 in zmq::writer_t::check_write (this=0x0, | |
| msg_=0x7fff811835d0) at pipe.cpp:190 | |
| 190 if (unlikely (!active)) | |
| (gdb) bt | |
| #0 0x00007f262dbd2468 in zmq::writer_t::check_write (this=0x0, msg_=0x7fff811835d0) at pipe.cpp:190 | |
| #1 0x00007f262dbd24d1 in zmq::writer_t::write (this=0x0, msg_=0x7fff811835d0) at pipe.cpp:203 | |
| #2 0x00007f262dbc3478 in zmq::dist_t::write (this=0x119ea48, pipe_=0x0, msg_=0x7fff811835d0) at dist.cpp:160 | |
| #3 0x00007f262dbc33ac in zmq::dist_t::distribute (this=0x119ea48, msg_=0x7fff811835d0, flags_=2) at dist.cpp:141 | |
| #4 0x00007f262dbc3250 in zmq::dist_t::send (this=0x119ea48, msg_=0x7fff811835d0, flags_=2) at dist.cpp:113 | |
| #5 0x00007f262dbe4308 in zmq::xpub_t::xsend (this=0x119e8f0, msg_=0x7fff811835d0, flags_=2) at xpub.cpp:57 | |
| #6 0x00007f262dbdf357 in zmq::socket_base_t::send (this=0x119e8f0, msg_=0x7fff811835d0, flags_=2) at socket_base.cpp:490 | |
| #7 0x00007f262dbe9e43 in zmq_sendmsg (s_=0x119e8f0, msg_=0x7fff811835d0, flags_=2) at zmq.cpp:266 | |
| #8 0x0000000000400957 in main () at pub2.c:22 | |
| Notes: this and following error happen arbitrarily, with both publishers. | |
| Subscriber | |
| ========== | |
| Test scenario: any, subscriber always crashes. | |
| sub: sub.c:25: main: Assertion `rc == 0' failed. | |
| Program terminated with signal 11, Segmentation fault. | |
| #0 0x00007f4ac8fc4468 in zmq::writer_t::check_write (this=0x0, msg_=0x7fff50f7d980) at pipe.cpp:190 | |
| 190 if (unlikely (!active)) | |
| (gdb) bt | |
| #0 0x00007f4ac8fc4468 in zmq::writer_t::check_write (this=0x0, msg_=0x7fff50f7d980) at pipe.cpp:190 | |
| #1 0x00007f4ac8fc44d1 in zmq::writer_t::write (this=0x0, msg_=0x7fff50f7d980) at pipe.cpp:203 | |
| #2 0x00007f4ac8fb5478 in zmq::dist_t::write (this=0x1f96a48, pipe_=0x0, msg_=0x7fff50f7d980) at dist.cpp:160 | |
| #3 0x00007f4ac8fb53ac in zmq::dist_t::distribute (this=0x1f96a48, msg_=0x7fff50f7d980, flags_=2) at dist.cpp:141 | |
| #4 0x00007f4ac8fb5250 in zmq::dist_t::send (this=0x1f96a48, msg_=0x7fff50f7d980, flags_=2) at dist.cpp:113 | |
| #5 0x00007f4ac8fd6308 in zmq::xpub_t::xsend (this=0x1f968f0, msg_=0x7fff50f7d980, flags_=2) at xpub.cpp:57 | |
| #6 0x00007f4ac8fd1357 in zmq::socket_base_t::send (this=0x1f968f0, msg_=0x7fff50f7d980, flags_=2) at socket_base.cpp:490 | |
| #7 0x00007f4ac8fdbe43 in zmq_sendmsg (s_=0x1f968f0, msg_=0x7fff50f7d980, flags_=2) at zmq.cpp:266 | |
| #8 0x00007f4ac8fdbb53 in zmq_send (s_=0x1f968f0, buf_=0x400a73, len_=5, flags_=2) at zmq.cpp:219 | |
| #9 0x00000000004008a7 in main () at sub.c:14 | |
| Notes: this does NOT happen when using sub2.c. |
| diff --git a/src/dist.cpp b/src/dist.cpp | |
| index e447bc1..d6a5684 100644 | |
| --- a/src/dist.cpp | |
| +++ b/src/dist.cpp | |
| @@ -25,9 +25,11 @@ | |
| #include "err.hpp" | |
| #include "own.hpp" | |
| #include "msg_content.hpp" | |
| +#include "likely.hpp" | |
| zmq::dist_t::dist_t (own_t *sink_) : | |
| active (0), | |
| + eligible (0), | |
| more (false), | |
| sink (sink_), | |
| terminating (false) | |
| @@ -41,20 +43,24 @@ zmq::dist_t::~dist_t () | |
| void zmq::dist_t::attach (writer_t *pipe_) | |
| { | |
| - // If we are in the middle of sending a message, let's postpone plugging | |
| - // in the pipe. | |
| - if (!terminating && more) { | |
| - new_pipes.push_back (pipe_); | |
| - return; | |
| - } | |
| - | |
| pipe_->set_event_sink (this); | |
| - pipes.push_back (pipe_); | |
| - pipes.swap (active, pipes.size () - 1); | |
| - active++; | |
| + // If we are in the middle of sending a message, we'll add new pipe | |
| + // into the list of eligible pipes. Otherwise we add it to the list | |
| + // of active pipes. | |
| + if (more) { | |
| + pipes.push_back (pipe_); | |
| + pipes.swap (eligible, pipes.size () - 1); | |
| + eligible++; | |
| + } | |
| + else { | |
| + pipes.push_back (pipe_); | |
| + pipes.swap (active, pipes.size () - 1); | |
| + active++; | |
| + eligible++; | |
| + } | |
| - if (terminating) { | |
| + if (unlikely (terminating)) { | |
| sink->register_term_acks (1); | |
| pipe_->terminate (); | |
| } | |
| @@ -72,21 +78,32 @@ void zmq::dist_t::terminate () | |
| void zmq::dist_t::terminated (writer_t *pipe_) | |
| { | |
| - // Remove the pipe from the list; adjust number of active pipes | |
| - // accordingly. | |
| + // Remove the pipe from the list; adjust number of active and/or | |
| + // eligible pipes accordingly. | |
| if (pipes.index (pipe_) < active) | |
| active--; | |
| + if (pipes.index (pipe_) < eligible) | |
| + eligible--; | |
| pipes.erase (pipe_); | |
| - if (terminating) | |
| + if (unlikely (terminating)) | |
| sink->unregister_term_ack (); | |
| } | |
| void zmq::dist_t::activated (writer_t *pipe_) | |
| { | |
| - // Move the pipe to the list of active pipes. | |
| - pipes.swap (pipes.index (pipe_), active); | |
| - active++; | |
| + // If we are in the middle of sending a message, we'll add new pipe | |
| + // into the list of eligible pipes. Otherwise we add it to the list | |
| + // of active pipes. | |
| + if (more) { | |
| + pipes.swap (pipes.index (pipe_), eligible); | |
| + eligible++; | |
| + } | |
| + else { | |
| + pipes.swap (pipes.index (pipe_), active); | |
| + active++; | |
| + eligible++; | |
| + } | |
| } | |
| int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) | |
| @@ -97,9 +114,9 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) | |
| // Push the message to active pipes. | |
| distribute (msg_, flags_); | |
| - // If mutlipart message is fully sent, activate new pipes. | |
| - if (more && !msg_more) | |
| - clear_new_pipes (); | |
| + // If multipart message is fully sent, activate all the eligible pipes. | |
| + if (!msg_more) | |
| + active = eligible; | |
| more = msg_more; | |
| @@ -181,16 +198,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) | |
| pipe_->flush (); | |
| return true; | |
| } | |
| - | |
| -void zmq::dist_t::clear_new_pipes () | |
| -{ | |
| - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); | |
| - ++it) { | |
| - (*it)->set_event_sink (this); | |
| - pipes.push_back (*it); | |
| - pipes.swap (active, pipes.size () - 1); | |
| - active++; | |
| - } | |
| - new_pipes.clear (); | |
| -} | |
| - | |
| diff --git a/src/dist.hpp b/src/dist.hpp | |
| index ad9767a..6c73941 100644 | |
| --- a/src/dist.hpp | |
| +++ b/src/dist.hpp | |
| @@ -63,17 +63,18 @@ namespace zmq | |
| typedef array_t <class writer_t> pipes_t; | |
| pipes_t pipes; | |
| - // List of new pipes that were not yet inserted into 'pipes' list. | |
| - // These pipes are moves to 'pipes' list once the current multipart | |
| - // message is fully sent. This way we avoid sending incomplete messages | |
| - // to peers. | |
| - typedef std::vector <class writer_t*> new_pipes_t; | |
| - new_pipes_t new_pipes; | |
| - | |
| // Number of active pipes. All the active pipes are located at the | |
| - // beginning of the pipes array. | |
| + // beginning of the pipes array. These are the pipes the messages | |
| + // can be sent to at the moment. | |
| pipes_t::size_type active; | |
| + // Number of pipes eligible for sending messages to. This includes all | |
| + // the active pipes plus all the pipes that we can in theory send | |
| + // messages to (the HWM is not yet reached), but sending a message | |
| + // to them would result in partial message being delivered, ie. message | |
| + // with initial parts missing. | |
| + pipes_t::size_type eligible; | |
| + | |
| // True if last we are in the middle of a multipart message. | |
| bool more; | |
| #include "zmq.h" | |
| #include "assert.h" | |
| #include "stdint.h" | |
| #include "string.h" | |
| #include "stdio.h" | |
| int main () | |
| { | |
| void *ctx = zmq_init (1); | |
| assert (ctx); | |
| void *s = zmq_socket (ctx, ZMQ_PUB); | |
| uint64_t sz = 100; | |
| int rc = zmq_setsockopt (s, ZMQ_HWM, &sz, sizeof (sz)); | |
| assert (rc == 0); | |
| rc = zmq_bind (s, "tcp://127.0.0.1:2211"); | |
| assert (rc == 0); | |
| zmq_msg_t msg; | |
| while (1) { | |
| puts ("sending"); | |
| zmq_msg_init_size (&msg, 5); | |
| rc = zmq_send (s, &msg, ZMQ_SNDMORE); | |
| zmq_msg_close (&msg); | |
| zmq_msg_init_size (&msg, 4); | |
| rc = zmq_send (s, &msg, ZMQ_SNDMORE); | |
| zmq_msg_close (&msg); | |
| zmq_msg_init_size (&msg, 1000); | |
| rc = zmq_send (s, &msg, 0); | |
| zmq_msg_close (&msg); | |
| } | |
| return 0; | |
| } |
| #include "zmq.h" | |
| #include "assert.h" | |
| #include "stdint.h" | |
| #include "string.h" | |
| int main () | |
| { | |
| void *ctx = zmq_init (1); | |
| assert (ctx); | |
| void *s = zmq_socket (ctx, ZMQ_PUB); | |
| int sz = 100; | |
| int rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sz, sizeof (sz)); | |
| assert (rc == 0); | |
| rc = zmq_bind (s, "tcp://127.0.0.1:2211"); | |
| assert (rc == 0); | |
| uint32_t seq = 0; | |
| char buff [1000]; | |
| memset (buff, 111, 1000); | |
| while (1) { | |
| rc = zmq_send (s, "topic", 5, ZMQ_SNDMORE); | |
| assert (rc == 5); | |
| rc = zmq_send (s, &seq, 4, ZMQ_SNDMORE); | |
| assert (rc == 4); | |
| rc = zmq_send (s, buff, 1000, 0); | |
| assert (rc == 1000); | |
| } | |
| return 0; | |
| } |
| #include "zmq.h" | |
| #include "assert.h" | |
| #include "stdint.h" | |
| #include "string.h" | |
| #include "stdio.h" | |
| int main () | |
| { | |
| void *ctx = zmq_init (1); | |
| assert (ctx); | |
| void *s = zmq_socket (ctx, ZMQ_PUB); | |
| int sz = 100; | |
| int rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sz, sizeof (sz)); | |
| assert (rc == 0); | |
| rc = zmq_bind (s, "tcp://127.0.0.1:2211"); | |
| assert (rc == 0); | |
| zmq_msg_t msg; | |
| while (1) { | |
| puts ("sending"); | |
| zmq_msg_init_size (&msg, 5); | |
| rc = zmq_sendmsg (s, &msg, ZMQ_SNDMORE); | |
| zmq_msg_close (&msg); | |
| zmq_msg_init_size (&msg, 4); | |
| rc = zmq_sendmsg (s, &msg, ZMQ_SNDMORE); | |
| zmq_msg_close (&msg); | |
| zmq_msg_init_size (&msg, 1000); | |
| rc = zmq_sendmsg (s, &msg, 0); | |
| zmq_msg_close (&msg); | |
| } | |
| return 0; | |
| } |
| #include "zmq.h" | |
| #include "assert.h" | |
| #include "stdint.h" | |
| #include "stdio.h" | |
| int | |
| zsockopt_rcvmore (void *socket) | |
| { | |
| int64_t rcvmore; | |
| size_t type_size = sizeof (int64_t); | |
| zmq_getsockopt (socket, ZMQ_RCVMORE, &rcvmore, &type_size); | |
| return (int) rcvmore; | |
| } | |
| int main () | |
| { | |
| void *ctx = zmq_init (1); | |
| assert (ctx); | |
| void *s = zmq_socket (ctx, ZMQ_SUB); | |
| int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "", 0); | |
| assert (rc == 0); | |
| uint64_t sz = 100; | |
| rc = zmq_setsockopt (s, ZMQ_HWM, &sz, sizeof (sz)); | |
| assert (rc == 0); | |
| rc = zmq_connect (s, "tcp://127.0.0.1:2211"); | |
| assert (rc == 0); | |
| while (1) { | |
| puts ("receiving"); | |
| zmq_msg_t msg; | |
| zmq_msg_init (&msg); | |
| rc = zmq_recv (s, &msg, 0); | |
| assert (zmq_msg_size (&msg) == 5); | |
| zmq_msg_close (&msg); | |
| int more = zsockopt_rcvmore (s); | |
| assert (more); | |
| zmq_msg_init (&msg); | |
| rc = zmq_recv (s, &msg, 0); | |
| assert (zmq_msg_size (&msg) == 4); | |
| zmq_msg_close (&msg); | |
| more = zsockopt_rcvmore (s); | |
| assert (more); | |
| zmq_msg_init (&msg); | |
| rc = zmq_recv (s, &msg, 0); | |
| assert (zmq_msg_size (&msg) == 1000); | |
| zmq_msg_close (&msg); | |
| more = zsockopt_rcvmore (s); | |
| assert (!more); | |
| } | |
| return 0; | |
| } | |
| #include "zmq.h" | |
| #include "assert.h" | |
| #include "stdint.h" | |
| int main () | |
| { | |
| void *ctx = zmq_init (1); | |
| assert (ctx); | |
| void *s = zmq_socket (ctx, ZMQ_SUB); | |
| int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "", 0); | |
| assert (rc == 0); | |
| int sz = 100; | |
| rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sz, sizeof (sz)); | |
| assert (rc == 0); | |
| rc = zmq_connect (s, "tcp://127.0.0.1:2211"); | |
| assert (rc == 0); | |
| char buff [1000]; | |
| while (1) { | |
| rc = zmq_recv (s, buff, 1000, 0); | |
| assert (rc == 5); | |
| int more; | |
| size_t moresz; | |
| rc = zmq_getsockopt (s, ZMQ_RCVMORE, &more, &moresz); | |
| assert (rc == 0); | |
| assert (more); | |
| rc = zmq_recv (s, buff, 1000, 0); | |
| assert (rc == 4); | |
| rc = zmq_getsockopt (s, ZMQ_RCVMORE, &more, &moresz); | |
| assert (rc == 0); | |
| assert (more); | |
| rc = zmq_recv (s, buff, 1000, 0); | |
| assert (rc == 1000); | |
| rc = zmq_getsockopt (s, ZMQ_RCVMORE, &more, &moresz); | |
| assert (rc == 0); | |
| assert (!more); | |
| } | |
| return 0; | |
| } |
| #include "zmq.h" | |
| #include "assert.h" | |
| #include "stdint.h" | |
| #include "stdio.h" | |
| int | |
| zsockopt_rcvmore (void *socket) | |
| { | |
| int rcvmore; | |
| size_t type_size = sizeof (int); | |
| zmq_getsockopt (socket, ZMQ_RCVMORE, &rcvmore, &type_size); | |
| return rcvmore; | |
| } | |
| int main () | |
| { | |
| void *ctx = zmq_init (1); | |
| assert (ctx); | |
| void *s = zmq_socket (ctx, ZMQ_SUB); | |
| int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "", 0); | |
| assert (rc == 0); | |
| int sz = 100; | |
| rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sz, sizeof (sz)); | |
| assert (rc == 0); | |
| rc = zmq_connect (s, "tcp://127.0.0.1:2211"); | |
| assert (rc == 0); | |
| while (1) { | |
| puts ("receiving"); | |
| zmq_msg_t msg; | |
| zmq_msg_init (&msg); | |
| rc = zmq_recvmsg (s, &msg, 0); | |
| assert (zmq_msg_size (&msg) == 5); | |
| zmq_msg_close (&msg); | |
| int more = zsockopt_rcvmore (s); | |
| assert (more); | |
| zmq_msg_init (&msg); | |
| rc = zmq_recvmsg (s, &msg, 0); | |
| assert (zmq_msg_size (&msg) == 4); | |
| zmq_msg_close (&msg); | |
| more = zsockopt_rcvmore (s); | |
| assert (more); | |
| zmq_msg_init (&msg); | |
| rc = zmq_recvmsg (s, &msg, 0); | |
| assert (zmq_msg_size (&msg) == 1000); | |
| zmq_msg_close (&msg); | |
| more = zsockopt_rcvmore (s); | |
| assert (!more); | |
| } | |
| return 0; | |
| } | |