Skip to content

Instantly share code, notes, and snippets.

@erickt
Created March 2, 2016 18:58
Show Gist options
  • Save erickt/ac3995bb23c378eaaa7a to your computer and use it in GitHub Desktop.
Save erickt/ac3995bb23c378eaaa7a to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
import logging
from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils import cached_property
from kombu.utils.compat import OrderedDict
from anyjson import loads, dumps
try:
import pykafka
except ImportError:
pykafka = None
LOG = logging.getLogger(__name__)
DEFAULT_PORT = 9092
DEFAULT_TOPIC = 'kombu_default'
class QoS(object):
def __init__(self, channel):
self.prefetch_count = 1
self._channel = channel
self._not_yet_acked = OrderedDict()
def can_consume(self):
"""Returns True if the :class:`Channel` can consume more messages, else
False.
:returns: True, if this QoS object can accept a message.
:rtype: bool
"""
return not self.prefetch_count or len(self._not_yet_acked) < self\
.prefetch_count
def can_consume_max_estimate(self):
if self.prefetch_count:
return self.prefetch_count - len(self._not_yet_acked)
else:
return 1
def append(self, message, delivery_tag):
self._not_yet_acked[delivery_tag] = message
def get(self, delivery_tag):
return self._not_yet_acked[delivery_tag]
def ack(self, delivery_tag):
message = self._not_yet_acked.pop(delivery_tag)
exchange = message.properties['delivery_info']['exchange']
consumer = self._channel.get_kafka_consumer(exchange)
consumer.commit_offsets()
def reject(self, delivery_tag, requeue=False):
"""Reject a message by delivery tag.
If requeue is True, then the last consumed message is reverted so it'll
be refetched on the next attempt. If False, that message is consumed and
ignored.
"""
if requeue:
consumer = self._channel.get_kafka_consumer(message.exchange)
consumer.set_offset(consumer.last_offset_consumed)
else:
self.ack(delivery_tag)
def restore_unacked_once(self):
pass
class Channel(virtual.Channel):
#: A class reference that will be instantiated using the qos property
QoS = QoS
from_transport_options = (
virtual.Channel.from_transport_options +
('consumer_group',
'consumer_timeout_ms')
)
_client = None
def __init__(self, *args, **kwargs):
super(Channel, self).__init__(*args, **kwargs)
self._kafka_producers = {}
self._kafka_consumers = {}
def basic_qos(self, *args, **kwargs):
"""Change :class:`QoS` settings for this Channel`.
Currently, this value is hard coded to 1.
"""
self.qos.prefetch_count = 1
def _put(self, queue, payload, **kwargs):
LOG.debug('putting into queue `%s`', queue)
queue = queue.encode('utf-8')
self.get_kafka_producer(queue).produce(dumps(payload))
print('done!')
def _get(self, queue, timeout=None):
LOG.debug('getting from queue `%s`', queue)
queue = queue.encode('utf-8')
item = self.get_kafka_consumer(queue).consume()
if item:
return loads(item.value)
raise Empty()
def _purge(self, queue):
LOG.debug('purging queue `%s`', queue)
consumer = self.get_kafka_consumer(queue)
latest_offset = consumer.topic.latest_available_offsets()
consumer.set_offset(latest_offset)
def get_kafka_producer(self, queue):
try:
return self._kafka_producers[queue]
except KeyError:
topic = self.connection.kafka_client.topics[queue]
producer = topic.get_producer()
self._kafka_producers[queue] = producer
return producer
def get_kafka_consumer(self, queue):
try:
return self._kafka_consumers[queue]
except KeyError:
topic = self.connection.kafka_client.topics[queue]
consumer = topic.get_simple_consumer(
consumer_group=self.consumer_group.encode('utf-8'),
consumer_timeout_ms=self.consumer_timeout_ms,
)
self._kafka_consumers[queue] = consumer
return consumer
class Transport(virtual.Transport):
Channel = Channel
#: kafka backend state is global.
state = virtual.BrokerState()
driver_type = 'kafka'
driver_name = 'pykafka'
def __init__(self, *args, **kwargs):
if pykafka is None:
raise ImportError('The pykafka library is not installed')
super(Transport, self).__init__(*args, **kwargs)
def driver_version(self):
return pykafka.__version__
@cached_property
def kafka_client(self):
conninfo = self.client
port = conninfo.port or DEFAULT_PORT
hosts = []
for host in conninfo.hostname.split(';'):
if ':' not in host:
host = host + ':' + str(port)
hosts.append(host)
return pykafka.KafkaClient(hosts=','.join(hosts))
def register_transport():
from kombu.transport import TRANSPORT_ALIASES
TRANSPORT_ALIASES['pykafka'] = 'server.kombu.kafka:Transport'
if __name__ == '__main__':
from kombu import Connection
logging.basicConfig()
LOG.setLevel(logging.DEBUG)
with Connection(
transport=Transport,
transport_options={
'consumer_group': 'kombu-consumer',
'consumer_timeout_ms': 1000,
}
) as connection:
with connection.SimpleQueue('kombu-test') as queue:
queue.put('hello world')
message = queue.get(timeout=0.1)
print(message)
message.ack()
message = queue.get(timeout=0.1)
print(message)
message.ack()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment