Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Last active May 28, 2019 09:48
Show Gist options
  • Save Gsantomaggio/bff62fe28210622cfa9d32d164412800 to your computer and use it in GitHub Desktop.
Save Gsantomaggio/bff62fe28210622cfa9d32d164412800 to your computer and use it in GitHub Desktop.
mandatory

Basic Idea

The idea is to add another parameter to the function RPCCLient as:

client = oslo_messaging.RPCClient(transport, target, options={'mandatory': True})
client.call({}, 'foo', id_value=str(i), test_value="hello oslo")

Inside the function, _publish decode the option value, as:

then pass the mandatory flag to the publish.

on_return function raises a new exception called:

MessageUndeliverable

so in case, the message is not routed to any queue, the call will raise the exception.

in this way:

       r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
            print("hello" + r + " - number: " + str(number))
        except exceptions.MessageUndeliverable as e:
            print("MessageUndeliverable error, RabbitMQ Exception: %s, routing_key: %s message: %s exchange: %s:" % (
                e.exception, e.routing_key, e.message.body, e.exchange))

Here you can find the file test.

the result would be:

id_value: 1 - test_value: ciao
hello1 - number: 1
id_value: 2 - test_value: ciao
hello2 - number: 1
id_value: 3 - test_value: ciao
hello3 - number: 1
MessageUndeliverable error, RabbitMQ Exception: Basic.return: (312) NO_ROUTE, routing_key: myroutingkey message: {"oslo.version": "2.0", "oslo.message.options": {"mandatory": true}, "oslo.message": "{\"method\": \"foo\", \"args\": {\"id_value\": \"4\", \"test_value\": \"ciao\"}, \"namespace\": \"test\", \"version\": \"2.0\", \"options\": {\"mandatory\": true}, \"_msg_id\": \"a697620caf0c445d90352646caa193bc\", \"_reply_q\": \"reply_4f239e94e1234785952448a79e60a38b\", \"_timeout\": null, \"_unique_id\": \"c27ea03c9b2b4870b3e34a19997cf143\"}"} exchange: myexchange:

how to test it

I execute the test file and then execute this script more than once.

it deletes the queues, to the message is not routed.

@4383
Copy link

4383 commented May 27, 2019

Really interesting changes!

We can by example use it with tenacity on the customer side to provide retries on calling failures or something like that.

from tenacity import *
import oslo_messaging
...
from oslo_messaging.exceptions import MessageUndeliverable
...

@retry(retry=retry_if_exception_type(MessageUndeliverable))
def call(transport, target, number):
    client = oslo_messaging.RPCClient(transport, target, options={'mandatory': True})

    for i in range(1, 50):
        time.sleep(3)
        r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
        print("hello" + r + " - number: " + str(number))
...

@Gsantomaggio
Copy link
Author

thank you Hervé,
moved the example: https://github.com/Gsantomaggio/rabbitmq-utils/tree/master/openstack/mandatory_test
PR are welcome :))!!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment