Skip to content

Instantly share code, notes, and snippets.

@pt033302
Last active March 15, 2021 07:41
Show Gist options
  • Save pt033302/e5b3c76733d2ac9af0b1bf0cc681e978 to your computer and use it in GitHub Desktop.
Save pt033302/e5b3c76733d2ac9af0b1bf0cc681e978 to your computer and use it in GitHub Desktop.
kafka:
url: one-kafka.dh-stage-message-bus.svc.cluster.local:9092
topic: gfrasca-test
enabled: False
amq:
url:
- amqps://messaging-devops-broker01.web.prod.ext.phx2.redhat.com:5671
- amqps://messaging-devops-broker02.web.prod.ext.phx2.redhat.com:5671
certificate: /home/pthangad/Desktop/psi-pipelines-robot.crt
private_key: /home/pthangad/Desktop/psi-pipelines-robot.key
consumer: Consumer.psi-pipelines-robot.<instance-name>
topic: 'VirtualTopic.eng.ci.>'
from os.path import dirname, join
import logging
import yaml
DEFAULT_CONFIG_PATH = join(dirname(__file__), "..",
"config", "config.local.yaml")
class ConfigurationManager(object):
def __init__(self, cfg_path=None):
cfg_path = cfg_path if cfg_path else DEFAULT_CONFIG_PATH
self._logger = logging.getLogger(__name__)
with open(cfg_path, 'r') as yaml_data:
self._config = yaml.load(yaml_data,Loader=yaml.FullLoader)
def _get_config(self):
return self._config
def _get_kafka_config(self):
return self._get_config().get('kafka', dict())
def _get_umb_config(self):
key = 'umb' if 'umb' in self._config else 'amq'
return self._get_config().get(key, dict())
@property
def producer_enabled(self):
return self._get_kafka_config().get('enabled', True)
def get_kafka_topic(self):
return self._get_kafka_config().get('topic')
def get_kafka_broker(self):
return self._get_kafka_config().get('url')
def get_umb_topic(self):
return self._get_umb_config().get('topic')
def get_umb_consumer(self):
return self._get_umb_config().get('consumer')
def get_umb_cert_path(self):
return self._get_umb_config().get('certificate')
def get_umb_private_key_path(self):
return self._get_umb_config().get('private_key')
def get_umb_brokers(self):
urls = self._get_umb_config().get('url')
return [urls] if isinstance(urls, str) else urls
from configuration_manager import ConfigurationManager
import argparse
from proton.handlers import MessagingHandler
from proton.reactor import Container, Selector
from proton import SSLDomain
import logging
class UmbReader(MessagingHandler):
def __init__(self, count,cfg_path=None):
super(UmbReader, self).__init__()
self._logger = logging.getLogger(__name__)
self.cm = ConfigurationManager(cfg_path)
self.umb_topic = self.cm.get_umb_topic()
self.consumer = self.cm.get_umb_consumer()
self.cert = self.cm.get_umb_cert_path()
self.key = self.cm.get_umb_private_key_path()
self.urls = self.cm.get_umb_brokers()
# messaging counters
self.expected = count
self.received = 0
def get_consumer_queue_str(self):
seperator = '' if self.consumer.endswith('.') else '.'
return '{}{}{}'.format(self.consumer, seperator, self.umb_topic)
def get_selector(self):
# return Selector("contains=pipelines")
return None
def on_start(self, event):
domain = SSLDomain(SSLDomain.MODE_CLIENT)
domain.set_credentials(self.cert, self.key, None)
conn = event.container.connect(urls=self.urls, ssl_domain=domain)
source = self.get_consumer_queue_str()
event.container.create_receiver(conn, source=source,
options=self.get_selector())
self._logger.info("Subscribed to topic {0}".format(source))
def on_message(self, event):
self._logger.info("Event-message id: {0}".format(event.message.id))
self._logger.info("Event-message body: {0}".format(event.message.body))
if event.receiver.queued == 0 and event.receiver.drained:
event.receiver.close()
event.connection.close()
def setup_logging(verbose):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action='store_true')
parser.add_argument("-m", "--messages", default=5)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
setup_logging(args.verbose)
try:
Container(UmbReader(args.messages)).run()
except KeyboardInterrupt: pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment