-
-
Save sboily/6a784942213fd1a18e35737fd1129ad4 to your computer and use it in GitHub Desktop.
Documentation for Stasis + RabbitMQ Event Forwarding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"_copyright": "Copyright (C) 2019 The Wazo Authors (see the AUTHORS file)", | |
"_author": "Nicolaos Ballas", | |
"_svn_revision": "$Revision$", | |
"apiVersion": "2.0.0", | |
"swaggerVersion": "1.1", | |
"basePath": "http://localhost:8088/ari", | |
"resourcePath": "/api-docs/amqp.{format}", | |
"apis": [ | |
{ | |
"path": "/amqp/{applicationName}", | |
"description": "Stasis application", | |
"operations": [ | |
{ | |
"httpMethod": "POST", | |
"summary": "Create a stasis subscription to AMQP.", | |
"notes": "Create a Stasis application and subscribe to it's event and forward them to AMQP. The application's name must be unique.", | |
"nickname": "stasisSubscribe", | |
"responseClass": "Application", | |
"parameters": [ | |
{ | |
"name": "applicationName", | |
"description": "Application's name", | |
"paramType": "path", | |
"required": true, | |
"allowMultiple": false, | |
"dataType": "string" | |
} | |
], | |
"errorResponses": [ | |
{ | |
"code": 400, | |
"reason": "Bad request." | |
} | |
] | |
}, | |
{ | |
"httpMethod": "DELETE", | |
"summary": "Remove a stasis subscription to AMQP.", | |
"notes": "Remove an internal Stasis application and its associated subscription.", | |
"nickname": "stasisUnsubscribe", | |
"responseClass": "Application", | |
"parameters": [ | |
{ | |
"name": "applicationName", | |
"description": "Application's name", | |
"paramType": "path", | |
"required": true, | |
"allowMultiple": false, | |
"dataType": "string" | |
} | |
], | |
"errorResponses": [ | |
{ | |
"code": 400, | |
"reason": "Bad request." | |
} | |
] | |
} | |
] | |
} | |
], | |
"models": { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Dialplan: | |
------------------------------------------------- | |
For an application named 'bar' | |
exten = 6001,1,NoOp() | |
same = n,Answer() | |
same = n,Stasis(bar) ; this will generate events which will be forwarded to stasis (websocket or AMQP) | |
same = n,Hangup() | |
REST API: | |
------------------------------------------------- | |
To create a Stasis Application named 'bar' | |
1. POST with applicationName=bar # This will create an internal application that will send events to AMQP | |
To delete the application created above | |
2. DELETE with applicationName=bar # This will delete the application, events will no longer be sent to AMQP | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*** Ask if the user prefers EID in stasis event or in its container*** | |
=> Forwarding of Stasis events to a receiver, RabbitMQ | |
Let's do this right: the receiver can be anyone, not just RabbitMQ. | |
Asterisk receives | |
1. Client 2. Asterisk 3. Receiver receives events | |
sends request creates Application | |
application and registers it to Stasis | |
4. Client receives | |
response | |
+--- stasis application | |
| | |
v | | | |
[client]-------------->| topic --> (*)--> callback---+ | | |
| | ^ | | | |
| | | +-->| ==== event ==> [RabbitMQ Exchange(s)] | |
| v | | | |
<--------------| event >>> Stasis -+ | | |
To listening events, listen on routing key stasis.app.# for an application. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Works with a latest wazo version or Asterisk 16.5.0 | |
AMQP client for Asterisk | |
Clone the repository: | |
cd /usr/src/ | |
git clone https://github.com/wazo-pbx/wazo-res-amqp | |
make | |
make install | |
Forward Stasis events to AMQP | |
cd /usr/src/ | |
git clone https://github.com/wazo-pbx/wazo-res-stasis-amqp | |
git checkout WAZO-939-Stasis-Event-Forwarding | |
make | |
make install | |
Restart Asterisk |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Support for multiple RabbitMQ Exchanges | |
- The res_stasis_amqp module has a global configuration set when the module is loaded. | |
Changing the global configuration at runtime does not appear to work; at least not with a simple implementation. | |
After some discussion it will be probably not an issue, but i would like to don't losse this informations. Looks like works same for websocket. | |
2. If you register an application with the websocket, it's possible to disabled it by the amqp endpoint ARI. | |
3. If we restart Asterisk we loose the application | |
4. If you registering an application on the websocket with the same name of an application already registered with the AMQP events, the callback is on websocket. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright 2015-2018 The Wazo Authors (see the AUTHORS file) | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/> | |
import argparse | |
import kombu | |
import logging | |
from kombu.mixins import ConsumerMixin | |
from pprint import pformat | |
EXCHANGE = kombu.Exchange('xivo', type='topic') | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(process)d] (%(levelname)s) (%(name)s): %(message)s') | |
logger = logging.getLogger(__name__) | |
class C(ConsumerMixin): | |
def __init__(self, connection, routing_key): | |
self.connection = connection | |
self.routing_key = routing_key | |
def get_consumers(self, Consumer, channel): | |
return [Consumer(kombu.Queue(exchange=EXCHANGE, routing_key=self.routing_key, exclusive=True), | |
callbacks=[self.on_message])] | |
def on_message(self, body, message): | |
logger.info('Received: %s', pformat(body)) | |
message.ack() | |
def main(): | |
parser = argparse.ArgumentParser('read RabbitMQ xivo exchange') | |
parser.add_argument('-n', '--hostname', help='RabbitMQ server', | |
default='localhost') | |
parser.add_argument('-p', '--port', help='Port of RabbitMQ', | |
default='5672') | |
parser.add_argument('-r', '--routing-key', help='Routing key to bind on bus', | |
dest='routing_key', default='#') | |
args = parser.parse_args() | |
url_amqp = 'amqp://guest:guest@%s:%s//' % (args.hostname, args.port) | |
with kombu.Connection(url_amqp) as conn: | |
try: | |
C(conn, args.routing_key).run() | |
except KeyboardInterrupt: | |
return | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ari as ari_client | |
from ari.exceptions import ARINotFound | |
import logging | |
import os | |
import pytest | |
from hamcrest import * | |
from hamcrest import assert_that | |
from hamcrest import calling | |
from hamcrest import has_property | |
from requests.exceptions import HTTPError | |
from xivo_test_helpers import until | |
from xivo_test_helpers.bus import BusClient | |
from xivo_test_helpers.asset_launching_test_case import AssetLaunchingTestCase | |
from xivo_test_helpers.hamcrest.raises import raises | |
log_level = logging.DEBUG if os.environ.get('TEST_LOGS') == 'verbose' else logging.INFO | |
logging.basicConfig(level=log_level) | |
class AssetLauncher(AssetLaunchingTestCase): | |
assets_root = os.path.join(os.path.dirname(__file__), '..', 'assets') | |
asset = 'amqp' | |
service = 'asterisk' | |
@pytest.fixture() | |
def ari(): | |
AssetLauncher.kill_containers() | |
AssetLauncher.rm_containers() | |
AssetLauncher.launch_service_with_asset() | |
ari_url = 'http://localhost:{port}'.format(port=AssetLauncher.service_port(5039, 'ari_amqp')) | |
ari = until.return_(ari_client.connect, ari_url, 'wazo', 'wazo', timeout=5, interval=0.1) | |
yield ari | |
AssetLauncher.kill_containers() | |
AssetLauncher.docker_exec(["asterisk", "-rx", "module load res_stasis_amqp.so"], service_name='ari_amqp') | |
def test_stasis_amqp_events(ari): | |
bus_client = BusClient.from_connection_fields(port=AssetLauncher.service_port(5672, 'rabbitmq')) | |
# AssetLauncher.docker_exec(["asterisk", "-rx", "module load res_stasis_amqp.so"], service_name='ari_amqp') | |
events = bus_client.accumulator("stasis.app.amqp_gateway") | |
ari.bridges.create() | |
def event_received(events): | |
assert_that(events.accumulate(), has_item( | |
has_entry('data', | |
has_entry('type', 'BridgeCreated') | |
) | |
)) | |
until.assert_(event_received, events, timeout=5) | |
subscribe_args = {'applicationName': 'NewStasisApplication'} | |
def test_app_subscribe(ari): | |
assert_that( | |
calling(ari.amqp.stasisSubscribe).with_args(**subscribe_args), | |
not_(raises(Exception)) | |
) | |
def test_app_unsubscribe(ari): | |
ari.amqp.stasisSubscribe(**subscribe_args) | |
assert_that( | |
calling(ari.amqp.stasisUnsubscribe).with_args(**subscribe_args), | |
not_(raises(Exception)) | |
) | |
def test_app_subscribe_duplicate_fail(ari): | |
ari.amqp.stasisSubscribe(**subscribe_args) | |
assert_that( | |
calling(ari.amqp.stasisSubscribe).with_args(**subscribe_args), | |
raises(ARINotFound).matching(has_property('original_error', | |
has_property('response', has_property('status_code', 409))) | |
) | |
) | |
def test_app_unsubscribe_fail(ari): | |
assert_that( | |
calling(ari.amqp.stasisUnsubscribe).with_args(**subscribe_args), | |
raises(ARINotFound).matching(has_property('original_error', | |
has_property('response', has_property('status_code', 404))) | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment