Skip to content

Instantly share code, notes, and snippets.

@redknitin
Created November 29, 2015 13:08
Show Gist options
  • Select an option

  • Save redknitin/e041356ee18f3e43dba7 to your computer and use it in GitHub Desktop.

Select an option

Save redknitin/e041356ee18f3e43dba7 to your computer and use it in GitHub Desktop.
Kombu Producer and Consumer
from kombu.mixins import ConsumerMixin
from kombu import Exchange, Connection, Queue
from kombu import Producer
import json
import pymysql
class P:
def __init__(self, conn):
self.connection = conn
def push_stuff(self):
chan = self.connection.channel()
xchg = Exchange('Test', 'direct', durable=False)
bxchg = xchg(chan)
q = Queue('TestQ', exchange=bxchg, routing_key='')
bq = q(chan)
p = Producer(channel=chan, exchange=bxchg, routing_key='')
p.publish('Gorilla')
class C(ConsumerMixin):
def __init__(self, conn):
self.connection = conn
def get_consumers(self, Consumer, channel):
chan = self.connection.channel()
xchg = Exchange('Test', 'direct', durable=False)
bxchg = xchg(chan)
q = Queue('TestQ', exchange=bxchg, routing_key='')
bq = q(chan)
queues = [bq]
return [
Consumer(queues, callbacks=[self.on_message]),
]
def on_message(self, body, message):
print("RECEIVED MESSAGE: %r" % (body, ))
message.ack()
if __name__=='__main__':
with Connection('amqp://nitin:reddy@192.168.0.10:5672') as conn:
P(conn).push_stuff()
C(conn).run()
# __author__ = 'nitin'
#
# from kombu import Exchange, Connection, Queue
#
# def main():
# with Connection('amqp://nitin:reddy@192.168.0.10:5672') as conn:
# chan = conn.channel()
# xchg = Exchange('Test', 'direct', durable=False)
# bxchg = xchg(chan)
# q = Queue('TestQ', exchange=xchg, routing_key='EAM')
# bq = q(chan)
# m = bq.get()
# if not m is None:
# print(m.payload+'\r\n')
# m.ack()
# chan.close()
#
# if __name__=='__main__':
# main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment