Skip to content

Instantly share code, notes, and snippets.

@westphahl
Last active September 9, 2019 20:39
Show Gist options
  • Save westphahl/eab376887748e3ba6adace7594708075 to your computer and use it in GitHub Desktop.
Save westphahl/eab376887748e3ba6adace7594708075 to your computer and use it in GitHub Desktop.
Pika - Connection reset bug
# Issue: https://github.com/pika/pika/issues/753
import logging
import pika
class Consumer:
def run(self, queue):
self.connection = connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_qos(prefetch_count=1, all_channels=True)
channel.queue_declare(queue=queue, durable=False,
exclusive=False, auto_delete=False)
channel.basic_consume(self.on_message, queue)
connection.add_timeout(1, self.on_timeout)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
def on_timeout(self):
self.connection.add_timeout(1, self.on_timeout)
def on_message(self, channel, method, properties, body):
self.connection.sleep(200)
channel.basic_ack(method.delivery_tag)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
consumer = Consumer()
consumer.run("foobar")
From f146ad15f00fe7596b2dc33a4698fcb321a14ca8 Mon Sep 17 00:00:00 2001
From: Simon Westphahl <[email protected]>
Date: Fri, 10 Mar 2017 14:19:22 +0100
Subject: blocking_adapter: only consider valid terminators
When processing data events in process_data_events() the
common_terminator passed to _flush_output() was also set to true for
events that could not be processed in the current context.
Considering those events anyways will lead to _flush_output() returning
immediately.
When there is a timeout set and process_data_events() is called from a
callback (e.g. on_message), this can lead to a dropped connection.
Fix is to check first if the events can be processed by acquiring the
dispatch.
---
pika/adapters/blocking_connection.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/pika/adapters/blocking_connection.py b/pika/adapters/blocking_connection.py
index 2582517..76c3b05 100644
--- a/pika/adapters/blocking_connection.py
+++ b/pika/adapters/blocking_connection.py
@@ -699,8 +699,10 @@ class BlockingConnection(object):
until I/O produces actionalable events. Defaults to 0 for backward
compatibility. This parameter is NEW in pika 0.10.0.
"""
- common_terminator = lambda: bool(
- self._channels_pending_dispatch or self._ready_events)
+ with self._acquire_event_dispatch() as dispatch_acquired:
+ # Check if we can actually process pending events
+ common_terminator = lambda: bool(dispatch_acquired and
+ (self._channels_pending_dispatch or self._ready_events))
if time_limit is None:
self._flush_output(common_terminator)
--
2.12.0
python bug_demo.py
INFO:pika.adapters.base_connection:Connecting to ::1:5672
INFO:pika.adapters.blocking_connection:Created channel=1
ERROR:pika.adapters.base_connection:Socket Error: 104
INFO:pika.connection:Disconnected from RabbitMQ at localhost:5672 (-1): ConnectionResetError(104, 'Connection reset by peer')
ERROR:pika.adapters.blocking_connection:Connection close detected; result=BlockingConnection__OnClosedArgs(connection=<SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>, reason_code=-1, reason_text="ConnectionResetError(104, 'Connection reset by peer')")
Traceback (most recent call last):
File "/opt/src/bug_demo.py", line 32, in <module>
consumer.run("foobar")
File "/opt/src/bug_demo.py", line 17, in run
channel.start_consuming()
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1759, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 719, in process_data_events
self._dispatch_channel_events()
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 518, in _dispatch_channel_events
impl_channel._get_cookie()._dispatch_events()
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1388, in _dispatch_events
evt.body)
File "/opt/src/bug_demo.py", line 26, in on_message
channel.basic_ack(method.delivery_tag)
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1967, in basic_ack
self._flush_output()
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1254, in _flush_output
*waiters)
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 474, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
@shivarajnaidu
Copy link

shivarajnaidu commented Oct 14, 2018

How to handle this pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')") exception ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment