Created
February 10, 2015 00:01
-
-
Save ajayhn/ca842987638131f0073a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/cfgm_common/vnc_kombu.py b/cfgm_common/vnc_kombu.py | |
index fbf637e..d8d5fd5 100644 | |
--- a/cfgm_common/vnc_kombu.py | |
+++ b/cfgm_common/vnc_kombu.py | |
@@ -6,6 +6,7 @@ import amqp.exceptions | |
import kombu | |
import gevent | |
import time | |
+import json | |
from pysandesh.connection_info import ConnectionState | |
from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \ | |
@@ -40,7 +41,8 @@ class VncKombuClient(object): | |
msg = "Unable to delete the old amqp Q: %s" % str(e) | |
self._logger(msg, level=SandeshLevel.SYS_ERR) | |
- self._obj_update_q = self._conn.SimpleQueue(self._update_queue_obj) | |
+ # used for publishing. separate channel/fd than for consuming | |
+ self._obj_update_exchange = self._update_exchange_obj(self._conn.channel()) | |
old_subscribe_greenlet = self._subscribe_greenlet | |
self._subscribe_greenlet = gevent.spawn(self._dbe_oper_subscribe) | |
@@ -68,10 +70,10 @@ class VncKombuClient(object): | |
self._subscribe_cb = subscribe_cb | |
self._logger = logger | |
- obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout', | |
- durable=False) | |
+ self._update_exchange_obj = kombu.Exchange('vnc_config.object-update', 'fanout', | |
+ durable=False) | |
- self._update_queue_obj = kombu.Queue(q_name, obj_upd_exchange) | |
+ self._update_queue_obj = kombu.Queue(q_name, self._update_exchange_obj) | |
self._subscribe_greenlet = None | |
self.connect(True) | |
# end __init__ | |
@@ -99,7 +101,8 @@ class VncKombuClient(object): | |
trace = None | |
try: | |
- self._subscribe_cb(message.payload) | |
+ cb_data = json.loads(message.payload) | |
+ self._subscribe_cb(cb_data) | |
except Exception as e: | |
msg = "Subscribe callback had error: %s" % str(e) | |
self._logger(msg, level=SandeshLevel.SYS_WARN) | |
diff --git a/vnc_cfg_api_server/vnc_cfg_ifmap.py b/vnc_cfg_api_server/vnc_cfg_ifmap.py | |
index 10bbd7d..623ebef 100644 | |
--- a/vnc_cfg_api_server/vnc_cfg_ifmap.py | |
+++ b/vnc_cfg_api_server/vnc_cfg_ifmap.py | |
@@ -913,7 +913,8 @@ class VncServerKombuClient(VncKombuClient): | |
message = self._publish_queue.get() | |
while True: | |
try: | |
- self._obj_update_q.put(message, serializer='json') | |
+ bound_msg = self._obj_update_exchange.Message(json.dumps(message)) | |
+ self._obj_update_exchange.publish(bound_msg, routing_key='') | |
break | |
except Exception as e: | |
log_str = "Disconnected from rabbitmq. Reinitializing connection: %s" % str(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment