Created
August 4, 2015 14:57
-
-
Save dims/349558ce662265ecddde to your computer and use it in GitHub Desktop.
pyampq diff for logging heartbeat (from Dmitry Mescheryakov)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| --- amqp/connection.py.initial 2015-08-04 17:38:06.000000000 +0300 | |
| +++ amqp/connection.py 2015-07-29 17:32:29.000000000 +0300 | |
| @@ -39,6 +39,9 @@ | |
| from .serialization import AMQPWriter | |
| from .transport import create_transport | |
| +import random | |
| +rnd = random.Random() | |
| + | |
| HAS_MSG_PEEK = hasattr(socket, 'MSG_PEEK') | |
| START_DEBUG_FMT = """ | |
| @@ -120,6 +123,8 @@ | |
| requiring certain certificates. | |
| """ | |
| + self.connection_id_1 = rnd.randint(0, 100000) | |
| + | |
| channel_max = channel_max or 65535 | |
| frame_max = frame_max or 131072 | |
| if (login_response is None) \ | |
| @@ -165,7 +170,7 @@ | |
| self.transport = self.Transport(host, connect_timeout, ssl) | |
| self.method_reader = MethodReader(self.transport) | |
| - self.method_writer = MethodWriter(self.transport, self.frame_max) | |
| + self.method_writer = MethodWriter(self.transport, self.frame_max, self.connection_id_1) | |
| self.wait(allowed_methods=[ | |
| (10, 10), # start | |
| @@ -889,20 +894,32 @@ | |
| :keyword rate: Ignored | |
| """ | |
| + | |
| + AMQP_LOGGER.warning('%s: entering AMQP heartbeat' % self.connection_id_1) | |
| + | |
| if not self.heartbeat: | |
| return | |
| # treat actual data exchange in either direction as a heartbeat | |
| sent_now = self.method_writer.bytes_sent | |
| recv_now = self.method_reader.bytes_recv | |
| + | |
| if self.prev_sent is None or self.prev_sent != sent_now: | |
| self.last_heartbeat_sent = monotonic() | |
| if self.prev_recv is None or self.prev_recv != recv_now: | |
| self.last_heartbeat_received = monotonic() | |
| + | |
| + mntnc = monotonic() | |
| + | |
| + AMQP_LOGGER.warning('Prev sent/recv: %s/%s, now - %s/%s, monotonic - %s, last_heartbeat_sent - %s, heartbeat int. - %s' % | |
| + (str(self.prev_sent), str(self.prev_recv), | |
| + str(sent_now), str(recv_now), str(mntnc), str(self.last_heartbeat_sent), str(self.heartbeat))) | |
| + | |
| self.prev_sent, self.prev_recv = sent_now, recv_now | |
| # send a heartbeat if it's time to do so | |
| - if monotonic() > self.last_heartbeat_sent + self.heartbeat: | |
| + if mntnc > self.last_heartbeat_sent + self.heartbeat: | |
| + AMQP_LOGGER.warning('AMQP: actually sending heartbeat') | |
| self.send_heartbeat() | |
| self.last_heartbeat_sent = monotonic() | |
| diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py | |
| index 326357c..b6bfdf1 100644 | |
| --- a/oslo_messaging/_drivers/impl_rabbit.py | |
| +++ b/oslo_messaging/_drivers/impl_rabbit.py | |
| @@ -993,6 +1015,7 @@ class Connection(object): | |
| try: | |
| try: | |
| + LOG.info('Trying heartbeat from hb thread') | |
| self._heartbeat_check() | |
| # NOTE(sileht): We need to drain event to receive | |
| # heartbeat from the broker but don't hold the | |
| @@ -1076,6 +1099,7 @@ class Connection(object): | |
| raise StopIteration | |
| if self._heartbeat_supported_and_enabled(): | |
| + LOG.info('Trying heartbeat from consume') | |
| self._heartbeat_check() | |
| try: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment