Skip to content

Instantly share code, notes, and snippets.

@jbub
Created December 20, 2017 09:08
Show Gist options
  • Save jbub/97c50c6bce82793c8dab925371ff9ceb to your computer and use it in GitHub Desktop.
Save jbub/97c50c6bce82793c8dab925371ff9ceb to your computer and use it in GitHub Desktop.
from kombu import Connection, Exchange, Queue
from kombu.pools import connections
class AmqpStore(Store):
name = 'amqp'
def configure(self, cfg, logger):
url = cfg.var('ITEM_STORE_URL')
self.logger = logger
self.connection = Connection(hostname=url)
self.exchange = Exchange(name='item_item', type='direct', durable=True)
self.queue = Queue(name='item_item', exchange=self.exchange, routing_key='item_item')
def create_great_item(self, payload):
return self.create_item(payload=payload, item_code='great')
def create_super_item(self, payload):
return self.create_item(payload=payload, item_code='super')
def create_item(self, payload, item_code):
with connections[self.connection].acquire(block=True, timeout=10) as conn:
message = self.create_message(payload=payload, item_code=item_code)
producer = conn.Producer(serializer='json')
producer.publish(
body=message,
routing_key='item_item',
exchange=self.exchange,
declare=[self.queue],
)
self.logger.info(event='message.published', store=self.name, **message['kwargs'])
def create_message(self, payload, item_code):
# this is celery v3 protocol version 1 compatible message
# when upgrading celery to new version, make sure this message is compatible
# SEE http://docs.celeryproject.org/en/latest/internals/protocol.html
kwargs = payload.as_dict()
kwargs['item_code'] = item_code
return {
'id': str(uuid.uuid4()),
'task': 'items.tasks.handle_item',
'args': [],
'kwargs': kwargs,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment