Last active
March 15, 2021 07:41
-
-
Save pt033302/e5b3c76733d2ac9af0b1bf0cc681e978 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka: | |
url: one-kafka.dh-stage-message-bus.svc.cluster.local:9092 | |
topic: gfrasca-test | |
enabled: False | |
amq: | |
url: | |
- amqps://messaging-devops-broker01.web.prod.ext.phx2.redhat.com:5671 | |
- amqps://messaging-devops-broker02.web.prod.ext.phx2.redhat.com:5671 | |
certificate: /home/pthangad/Desktop/psi-pipelines-robot.crt | |
private_key: /home/pthangad/Desktop/psi-pipelines-robot.key | |
consumer: Consumer.psi-pipelines-robot.<instance-name> | |
topic: 'VirtualTopic.eng.ci.>' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os.path import dirname, join | |
import logging | |
import yaml | |
DEFAULT_CONFIG_PATH = join(dirname(__file__), "..", | |
"config", "config.local.yaml") | |
class ConfigurationManager(object): | |
def __init__(self, cfg_path=None): | |
cfg_path = cfg_path if cfg_path else DEFAULT_CONFIG_PATH | |
self._logger = logging.getLogger(__name__) | |
with open(cfg_path, 'r') as yaml_data: | |
self._config = yaml.load(yaml_data,Loader=yaml.FullLoader) | |
def _get_config(self): | |
return self._config | |
def _get_kafka_config(self): | |
return self._get_config().get('kafka', dict()) | |
def _get_umb_config(self): | |
key = 'umb' if 'umb' in self._config else 'amq' | |
return self._get_config().get(key, dict()) | |
@property | |
def producer_enabled(self): | |
return self._get_kafka_config().get('enabled', True) | |
def get_kafka_topic(self): | |
return self._get_kafka_config().get('topic') | |
def get_kafka_broker(self): | |
return self._get_kafka_config().get('url') | |
def get_umb_topic(self): | |
return self._get_umb_config().get('topic') | |
def get_umb_consumer(self): | |
return self._get_umb_config().get('consumer') | |
def get_umb_cert_path(self): | |
return self._get_umb_config().get('certificate') | |
def get_umb_private_key_path(self): | |
return self._get_umb_config().get('private_key') | |
def get_umb_brokers(self): | |
urls = self._get_umb_config().get('url') | |
return [urls] if isinstance(urls, str) else urls |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from configuration_manager import ConfigurationManager | |
import argparse | |
from proton.handlers import MessagingHandler | |
from proton.reactor import Container, Selector | |
from proton import SSLDomain | |
import logging | |
class UmbReader(MessagingHandler): | |
def __init__(self, count,cfg_path=None): | |
super(UmbReader, self).__init__() | |
self._logger = logging.getLogger(__name__) | |
self.cm = ConfigurationManager(cfg_path) | |
self.umb_topic = self.cm.get_umb_topic() | |
self.consumer = self.cm.get_umb_consumer() | |
self.cert = self.cm.get_umb_cert_path() | |
self.key = self.cm.get_umb_private_key_path() | |
self.urls = self.cm.get_umb_brokers() | |
# messaging counters | |
self.expected = count | |
self.received = 0 | |
def get_consumer_queue_str(self): | |
seperator = '' if self.consumer.endswith('.') else '.' | |
return '{}{}{}'.format(self.consumer, seperator, self.umb_topic) | |
def get_selector(self): | |
# return Selector("contains=pipelines") | |
return None | |
def on_start(self, event): | |
domain = SSLDomain(SSLDomain.MODE_CLIENT) | |
domain.set_credentials(self.cert, self.key, None) | |
conn = event.container.connect(urls=self.urls, ssl_domain=domain) | |
source = self.get_consumer_queue_str() | |
event.container.create_receiver(conn, source=source, | |
options=self.get_selector()) | |
self._logger.info("Subscribed to topic {0}".format(source)) | |
def on_message(self, event): | |
self._logger.info("Event-message id: {0}".format(event.message.id)) | |
self._logger.info("Event-message body: {0}".format(event.message.body)) | |
if event.receiver.queued == 0 and event.receiver.drained: | |
event.receiver.close() | |
event.connection.close() | |
def setup_logging(verbose): | |
level = logging.DEBUG if verbose else logging.INFO | |
logging.basicConfig(level=level) | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-v", "--verbose", action='store_true') | |
parser.add_argument("-m", "--messages", default=5) | |
return parser.parse_args() | |
if __name__ == '__main__': | |
args = parse_args() | |
setup_logging(args.verbose) | |
try: | |
Container(UmbReader(args.messages)).run() | |
except KeyboardInterrupt: pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment