Created
March 2, 2016 18:58
-
-
Save erickt/ac3995bb23c378eaaa7a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import | |
import logging | |
from kombu.five import Empty | |
from kombu.transport import virtual | |
from kombu.utils import cached_property | |
from kombu.utils.compat import OrderedDict | |
from anyjson import loads, dumps | |
try: | |
import pykafka | |
except ImportError: | |
pykafka = None | |
LOG = logging.getLogger(__name__) | |
DEFAULT_PORT = 9092 | |
DEFAULT_TOPIC = 'kombu_default' | |
class QoS(object): | |
def __init__(self, channel): | |
self.prefetch_count = 1 | |
self._channel = channel | |
self._not_yet_acked = OrderedDict() | |
def can_consume(self): | |
"""Returns True if the :class:`Channel` can consume more messages, else | |
False. | |
:returns: True, if this QoS object can accept a message. | |
:rtype: bool | |
""" | |
return not self.prefetch_count or len(self._not_yet_acked) < self\ | |
.prefetch_count | |
def can_consume_max_estimate(self): | |
if self.prefetch_count: | |
return self.prefetch_count - len(self._not_yet_acked) | |
else: | |
return 1 | |
def append(self, message, delivery_tag): | |
self._not_yet_acked[delivery_tag] = message | |
def get(self, delivery_tag): | |
return self._not_yet_acked[delivery_tag] | |
def ack(self, delivery_tag): | |
message = self._not_yet_acked.pop(delivery_tag) | |
exchange = message.properties['delivery_info']['exchange'] | |
consumer = self._channel.get_kafka_consumer(exchange) | |
consumer.commit_offsets() | |
def reject(self, delivery_tag, requeue=False): | |
"""Reject a message by delivery tag. | |
If requeue is True, then the last consumed message is reverted so it'll | |
be refetched on the next attempt. If False, that message is consumed and | |
ignored. | |
""" | |
if requeue: | |
consumer = self._channel.get_kafka_consumer(message.exchange) | |
consumer.set_offset(consumer.last_offset_consumed) | |
else: | |
self.ack(delivery_tag) | |
def restore_unacked_once(self): | |
pass | |
class Channel(virtual.Channel): | |
#: A class reference that will be instantiated using the qos property | |
QoS = QoS | |
from_transport_options = ( | |
virtual.Channel.from_transport_options + | |
('consumer_group', | |
'consumer_timeout_ms') | |
) | |
_client = None | |
def __init__(self, *args, **kwargs): | |
super(Channel, self).__init__(*args, **kwargs) | |
self._kafka_producers = {} | |
self._kafka_consumers = {} | |
def basic_qos(self, *args, **kwargs): | |
"""Change :class:`QoS` settings for this Channel`. | |
Currently, this value is hard coded to 1. | |
""" | |
self.qos.prefetch_count = 1 | |
def _put(self, queue, payload, **kwargs): | |
LOG.debug('putting into queue `%s`', queue) | |
queue = queue.encode('utf-8') | |
self.get_kafka_producer(queue).produce(dumps(payload)) | |
print('done!') | |
def _get(self, queue, timeout=None): | |
LOG.debug('getting from queue `%s`', queue) | |
queue = queue.encode('utf-8') | |
item = self.get_kafka_consumer(queue).consume() | |
if item: | |
return loads(item.value) | |
raise Empty() | |
def _purge(self, queue): | |
LOG.debug('purging queue `%s`', queue) | |
consumer = self.get_kafka_consumer(queue) | |
latest_offset = consumer.topic.latest_available_offsets() | |
consumer.set_offset(latest_offset) | |
def get_kafka_producer(self, queue): | |
try: | |
return self._kafka_producers[queue] | |
except KeyError: | |
topic = self.connection.kafka_client.topics[queue] | |
producer = topic.get_producer() | |
self._kafka_producers[queue] = producer | |
return producer | |
def get_kafka_consumer(self, queue): | |
try: | |
return self._kafka_consumers[queue] | |
except KeyError: | |
topic = self.connection.kafka_client.topics[queue] | |
consumer = topic.get_simple_consumer( | |
consumer_group=self.consumer_group.encode('utf-8'), | |
consumer_timeout_ms=self.consumer_timeout_ms, | |
) | |
self._kafka_consumers[queue] = consumer | |
return consumer | |
class Transport(virtual.Transport): | |
Channel = Channel | |
#: kafka backend state is global. | |
state = virtual.BrokerState() | |
driver_type = 'kafka' | |
driver_name = 'pykafka' | |
def __init__(self, *args, **kwargs): | |
if pykafka is None: | |
raise ImportError('The pykafka library is not installed') | |
super(Transport, self).__init__(*args, **kwargs) | |
def driver_version(self): | |
return pykafka.__version__ | |
@cached_property | |
def kafka_client(self): | |
conninfo = self.client | |
port = conninfo.port or DEFAULT_PORT | |
hosts = [] | |
for host in conninfo.hostname.split(';'): | |
if ':' not in host: | |
host = host + ':' + str(port) | |
hosts.append(host) | |
return pykafka.KafkaClient(hosts=','.join(hosts)) | |
def register_transport(): | |
from kombu.transport import TRANSPORT_ALIASES | |
TRANSPORT_ALIASES['pykafka'] = 'server.kombu.kafka:Transport' | |
if __name__ == '__main__': | |
from kombu import Connection | |
logging.basicConfig() | |
LOG.setLevel(logging.DEBUG) | |
with Connection( | |
transport=Transport, | |
transport_options={ | |
'consumer_group': 'kombu-consumer', | |
'consumer_timeout_ms': 1000, | |
} | |
) as connection: | |
with connection.SimpleQueue('kombu-test') as queue: | |
queue.put('hello world') | |
message = queue.get(timeout=0.1) | |
print(message) | |
message.ack() | |
message = queue.get(timeout=0.1) | |
print(message) | |
message.ack() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment