Skip to content

Instantly share code, notes, and snippets.

@shivamMg
Last active April 22, 2020 11:36
Show Gist options
  • Save shivamMg/9f5dae009cefdd6f4f4f8b5bebae8e7c to your computer and use it in GitHub Desktop.
Save shivamMg/9f5dae009cefdd6f4f4f8b5bebae8e7c to your computer and use it in GitHub Desktop.
kombu producer consumer (pyamqp / librabbitmq)
from kombu import Connection, Exchange, Queue
from kombu.common import maybe_declare
from kombu.mixins import ConsumerMixin
HOST = ''
USER = ''
PASSWORD = ''
PORT = 443
SSL = {'server_hostname': HOST} # False
TRANSPORT = 'pyamqp' # pyamqp/librabbitmq
TRANSPORT_OPTS = {'confirm_publish': True}
EXCHANGE = Exchange('__test__', type='direct')
QUEUES = [Queue(name='test_default',
exchange=EXCHANGE,
routing_key='',
auto_delete=True,
queue_arguments={
'x-expires': 100000,
}),
Queue(name='test',
exchange=EXCHANGE,
routing_key='boom',
auto_delete=True,
queue_arguments={
'x-expires': 100000,
})]
for queue in QUEUES:
queue.no_ack = False
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
def get_consumers(self, consumer_class, default_channel):
return [consumer_class(queues=QUEUES[0],
callbacks=[self.on_message]
),
consumer_class(queues=QUEUES[1],
callbacks=[self.on_message])]
def on_message(self, body, message):
print body, message
message.ack()
if __name__ == '__main__':
if TRANSPORT == 'librabbitmq': # ensure it's importable
import librabbitmq
conn = Connection(
hostname=HOST,
userid=USER,
password=PASSWORD,
port=PORT,
ssl=SSL,
transport=TRANSPORT,
transport_options=TRANSPORT_OPTS,
virtual_host='/',
)
conn.connect()
chan = conn.default_channel
maybe_declare(EXCHANGE, chan)
try:
Worker(conn, QUEUES).run()
except KeyboardInterrupt:
print 'sigint'
from kombu import Connection, Exchange
from kombu.common import maybe_declare
HOST = ''
USER = ''
PASSWORD = ''
PORT = 443
SSL = {'server_hostname': HOST} # False
TRANSPORT = 'pyamqp' # pyamqp/librabbitmq
TRANSPORT_OPTS = {'confirm_publish': True}
EXCHANGE = Exchange('__test__', type='direct')
if __name__ == '__main__':
if TRANSPORT == 'librabbitmq': # ensure it's importable
import librabbitmq
conn = Connection(
hostname=HOST,
userid=USER,
password=PASSWORD,
port=PORT,
ssl=SSL,
transport=TRANSPORT,
transport_options=TRANSPORT_OPTS,
virtual_host='/',
)
conn.connect()
chan = conn.default_channel
maybe_declare(EXCHANGE, chan)
msg = chan.prepare_message('message body')
chan.basic_publish(msg, exchange='__test__')
chan.basic_publish_confirm(msg, exchange='__test__')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment