Last active
September 9, 2019 20:39
-
-
Save westphahl/eab376887748e3ba6adace7594708075 to your computer and use it in GitHub Desktop.
Pika - Connection reset bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Issue: https://github.com/pika/pika/issues/753 | |
import logging | |
import pika | |
class Consumer: | |
def run(self, queue): | |
self.connection = connection = pika.BlockingConnection() | |
channel = connection.channel() | |
channel.basic_qos(prefetch_count=1, all_channels=True) | |
channel.queue_declare(queue=queue, durable=False, | |
exclusive=False, auto_delete=False) | |
channel.basic_consume(self.on_message, queue) | |
connection.add_timeout(1, self.on_timeout) | |
try: | |
channel.start_consuming() | |
except KeyboardInterrupt: | |
channel.stop_consuming() | |
def on_timeout(self): | |
self.connection.add_timeout(1, self.on_timeout) | |
def on_message(self, channel, method, properties, body): | |
self.connection.sleep(200) | |
channel.basic_ack(method.delivery_tag) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
consumer = Consumer() | |
consumer.run("foobar") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From f146ad15f00fe7596b2dc33a4698fcb321a14ca8 Mon Sep 17 00:00:00 2001 | |
From: Simon Westphahl <[email protected]> | |
Date: Fri, 10 Mar 2017 14:19:22 +0100 | |
Subject: blocking_adapter: only consider valid terminators | |
When processing data events in process_data_events() the | |
common_terminator passed to _flush_output() was also set to true for | |
events that could not be processed in the current context. | |
Considering those events anyways will lead to _flush_output() returning | |
immediately. | |
When there is a timeout set and process_data_events() is called from a | |
callback (e.g. on_message), this can lead to a dropped connection. | |
Fix is to check first if the events can be processed by acquiring the | |
dispatch. | |
--- | |
pika/adapters/blocking_connection.py | 6 ++++-- | |
1 file changed, 4 insertions(+), 2 deletions(-) | |
diff --git a/pika/adapters/blocking_connection.py b/pika/adapters/blocking_connection.py | |
index 2582517..76c3b05 100644 | |
--- a/pika/adapters/blocking_connection.py | |
+++ b/pika/adapters/blocking_connection.py | |
@@ -699,8 +699,10 @@ class BlockingConnection(object): | |
until I/O produces actionalable events. Defaults to 0 for backward | |
compatibility. This parameter is NEW in pika 0.10.0. | |
""" | |
- common_terminator = lambda: bool( | |
- self._channels_pending_dispatch or self._ready_events) | |
+ with self._acquire_event_dispatch() as dispatch_acquired: | |
+ # Check if we can actually process pending events | |
+ common_terminator = lambda: bool(dispatch_acquired and | |
+ (self._channels_pending_dispatch or self._ready_events)) | |
if time_limit is None: | |
self._flush_output(common_terminator) | |
-- | |
2.12.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python bug_demo.py | |
INFO:pika.adapters.base_connection:Connecting to ::1:5672 | |
INFO:pika.adapters.blocking_connection:Created channel=1 | |
ERROR:pika.adapters.base_connection:Socket Error: 104 | |
INFO:pika.connection:Disconnected from RabbitMQ at localhost:5672 (-1): ConnectionResetError(104, 'Connection reset by peer') | |
ERROR:pika.adapters.blocking_connection:Connection close detected; result=BlockingConnection__OnClosedArgs(connection=<SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>, reason_code=-1, reason_text="ConnectionResetError(104, 'Connection reset by peer')") | |
Traceback (most recent call last): | |
File "/opt/src/bug_demo.py", line 32, in <module> | |
consumer.run("foobar") | |
File "/opt/src/bug_demo.py", line 17, in run | |
channel.start_consuming() | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1759, in start_consuming | |
self.connection.process_data_events(time_limit=None) | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 719, in process_data_events | |
self._dispatch_channel_events() | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 518, in _dispatch_channel_events | |
impl_channel._get_cookie()._dispatch_events() | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1388, in _dispatch_events | |
evt.body) | |
File "/opt/src/bug_demo.py", line 26, in on_message | |
channel.basic_ack(method.delivery_tag) | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1967, in basic_ack | |
self._flush_output() | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 1254, in _flush_output | |
*waiters) | |
File "/opt/src/pika/pika/adapters/blocking_connection.py", line 474, in _flush_output | |
result.reason_text) | |
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How to handle this
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
exception ?