Created
January 19, 2017 10:04
-
-
Save siddharth96/2acce2ac7d75b7cf565d0d028f038e56 to your computer and use it in GitHub Desktop.
Celery based Kafka consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
kombu.transport.kafka | |
===================== | |
Kafka transport. | |
:copyright: (c) 2010 - 2013 by Mahendra M. | |
:license: BSD, see LICENSE for more details. | |
**Synopsis** | |
Connects to kafka (0.8.x+) as <server>:<port>/<vhost> | |
The <vhost> becomes the group for all the clients. So we can use | |
it like a vhost | |
It is recommended that the queue be created in advance, by specifying the | |
number of partitions. The partition configuration determines how many | |
consumers can fetch data in parallel | |
**Limitations** | |
* The client API needs to modified to fetch data only from a single | |
partition. This can be used for effective load balancing also. | |
""" | |
from __future__ import absolute_import | |
import calendar | |
import datetime | |
import time | |
import pykafka | |
from anyjson import loads, dumps | |
from kombu.five import Empty | |
from kombu.transport import virtual | |
from kombu.utils.compat import OrderedDict | |
from kombu.utils.url import parse_url | |
from pykafka import KafkaClient | |
from pykafka.protocol import PartitionOffsetCommitRequest | |
from pykafka.common import OffsetType | |
KAFKA_CONNECTION_ERRORS = () | |
KAFKA_CHANNEL_ERRORS = () | |
DEFAULT_PORT = 9092 | |
DEFAULT_KAFKA_GROUP = 'kombu-consumer-group' | |
class QoS(object): | |
def __init__(self, channel): | |
self.prefetch_count = 1 | |
self._channel = channel | |
self._not_yet_acked = OrderedDict() | |
def can_consume(self): | |
"""Returns True if the :class:`Channel` can consume more messages, else | |
False. | |
:returns: True, if this QoS object can accept a message. | |
:rtype: bool | |
""" | |
return not self.prefetch_count or len(self._not_yet_acked) < self \ | |
.prefetch_count | |
def can_consume_max_estimate(self): | |
if self.prefetch_count: | |
return self.prefetch_count - len(self._not_yet_acked) | |
else: | |
return 1 | |
def append(self, message, delivery_tag): | |
self._not_yet_acked[delivery_tag] = message | |
def get(self, delivery_tag): | |
return self._not_yet_acked[delivery_tag] | |
def ack(self, delivery_tag): | |
message = self._not_yet_acked.pop(delivery_tag) | |
# exchange = message.properties['delivery_info']['exchange'] | |
consumer = self._channel._get_consumer(self._get_queue(message)) | |
consumer.commit_offsets() | |
def reject(self, delivery_tag, requeue=False): | |
"""Reject a message by delivery tag. | |
If requeue is True, then the last consumed message is reverted so it'll | |
be refetched on the next attempt. If False, that message is consumed and | |
ignored. | |
In order to revert a message, it's first acknowledged and then re-queued | |
""" | |
if requeue: | |
message = self._not_yet_acked.get(delivery_tag) | |
self.on_requeue(message, delivery_tag) | |
self.ack(delivery_tag) | |
def on_requeue(self, message, delivery_tag): | |
""" | |
This approach is risky and reset_offsets is also not recommended by pykafka team. | |
Hence, overriding this in BBQoS class | |
:type message: kombu.transport.virtual.Message | |
:type delivery_tag: str | |
:return: | |
""" | |
# exchange = message.properties['delivery_info']['exchange'] | |
consumer = self._channel._get_consumer(self._get_queue(message)) | |
# type consumer: pykafka.balancedconsumer.BalancedConsumer | |
partition_id = message.headers['partition_id'] | |
partition = consumer.partitions[partition_id] | |
last_offset_consumed = consumer.held_offsets[partition_id] | |
consumer.reset_offsets(((partition, last_offset_consumed),)) | |
def restore_unacked_once(self): | |
pass | |
def _get_queue(self, message): | |
q = None | |
# Read queue name from channel | |
if self._channel._active_queues and len(self._channel._active_queues) == 1: | |
q = self._channel._active_queues[0] | |
# If channel is bound to multiple queues then try reading the queue | |
# from the message using a 'jugaad' | |
elif 'args' in message.payload: | |
try: | |
# Try reading queue from passed in thread-locals | |
# Refer "shared_context" variable in bbcelery/queue.py -> _queueable_impl() | |
# i.e., message.payload['args'][1] = "shared_context" in the above method | |
q = message.payload['args'][1]['queue'] | |
except (IndexError, KeyError): | |
q = None | |
if not q: | |
# Since routing-key is same as queue right now | |
q = message.properties['delivery_info']['routing_key'] | |
return q | |
class Channel(virtual.Channel): | |
QoS = QoS | |
_client = None | |
_kafka_group = None | |
def __init__(self, *args, **kwargs): | |
super(Channel, self).__init__(*args, **kwargs) | |
self._kafka_consumers = {} | |
self._kafka_producers = {} | |
def exchange_unbind(self, destination, source='', routing_key='', nowait=False, | |
arguments=None): | |
""" | |
Not applicable for Kafka | |
:param destination: | |
:param source: | |
:param routing_key: | |
:param nowait: | |
:param arguments: | |
:return: | |
""" | |
pass | |
def queue_unbind(self, queue, exchange=None, routing_key='', arguments=None, **kwargs): | |
""" | |
Not applicable for Kafka | |
:param queue: | |
:param exchange: | |
:param routing_key: | |
:param arguments: | |
:param kwargs: | |
:return: | |
""" | |
pass | |
def exchange_bind(self, destination, source='', routing_key='', nowait=False, arguments=None): | |
""" | |
Not applicable for Kafka | |
:param destination: | |
:param source: | |
:param routing_key: | |
:param nowait: | |
:param arguments: | |
:return: | |
""" | |
pass | |
def flow(self, active=True): | |
""" | |
Not applicable for Kafka | |
:param active: | |
:return: | |
""" | |
pass | |
def fetch_offsets(self, client, topic, offset): | |
"""Fetch raw offset data from a topic. | |
note: stolen from the pykafka cli | |
:param client: KafkaClient connected to the cluster. | |
:type client: :class:`pykafka.KafkaClient` | |
:param topic: Name of the topic. | |
:type topic: :class:`pykafka.topic.Topic` | |
:param offset: Offset to reset to. Can be earliest, latest or a | |
datetime. Using a datetime will reset the offset to the latest | |
message published *before* the datetime. | |
:type offset: :class:`pykafka.common.OffsetType` or | |
:class:`datetime.datetime` | |
:returns: { | |
partition_id: :class:`pykafka.protocol.OffsetPartitionResponse` | |
} | |
""" | |
if offset.lower() == 'earliest': | |
return topic.earliest_available_offsets() | |
elif offset.lower() == 'latest': | |
return topic.latest_available_offsets() | |
else: | |
offset = datetime.datetime.strptime(offset, "%Y-%m-%dT%H:%M:%S") | |
offset = int(calendar.timegm(offset.utctimetuple()) * 1000) | |
return topic.fetch_offset_limits(offset) | |
def sanitize_queue_name(self, queue): | |
"""Need to sanitize the queue name, celery sometimes pushes in @ | |
signs""" | |
return str(queue).replace('@', '') | |
def _get_producer(self, queue): | |
"""Create/get a producer instance for the given topic/queue""" | |
queue = self.sanitize_queue_name(queue) | |
producer = self._kafka_producers.get(queue, None) | |
if producer is None: | |
producer = self.client.topics[queue]\ | |
.get_producer(use_rdkafka=True, min_queued_messages=1) | |
self._kafka_producers[queue] = producer | |
return producer | |
def _get_consumer(self, queue): | |
"""Create/get a consumer instance for the given topic/queue""" | |
queue = self.sanitize_queue_name(queue) | |
consumer = self._kafka_consumers.get(queue, None) | |
if consumer is None: | |
topic = self.client.topics[queue] | |
# consumer = topic.get_simple_consumer( | |
consumer = topic.get_balanced_consumer( | |
consumer_group=self._kafka_group, | |
use_rdkafka=True, | |
auto_commit_enable=False) | |
self._kafka_consumers[queue] = consumer | |
return consumer | |
def _put(self, queue, message, **kwargs): | |
"""Put a message on the topic/queue""" | |
queue = self.sanitize_queue_name(queue) | |
producer = self._get_producer(queue) | |
producer.produce(dumps(message)) | |
def _get(self, queue, **kwargs): | |
"""Get a message from the topic/queue""" | |
consumer = self._get_consumer(queue) | |
message = consumer.consume(block=False) | |
if not message: | |
raise Empty() | |
return self._unmarshall(message) | |
def _unmarshall(self, message): | |
""" | |
:type message: pykafka.protocol.Message | |
:rtype: dict | |
""" | |
return loads(message.value) | |
def _purge(self, queue): | |
"""Purge all pending messages in the topic/queue, taken from the pykafka | |
cli | |
""" | |
queue = self.sanitize_queue_name(queue) | |
# Don't auto-create topics. | |
if queue not in self.client.topics: | |
return 0 | |
topic = self.client.topics[queue] | |
size = self._size(queue) | |
# build offset commit requests | |
offsets = topic.latest_available_offsets() | |
tmsp = int(time.time() * 1000) | |
reqs = [PartitionOffsetCommitRequest(queue, | |
partition_id, | |
res.offset[0], | |
tmsp, | |
'kombu') | |
for partition_id, res in offsets.iteritems()] | |
# Send them to the appropriate broker. | |
broker = self.client.cluster.get_offset_manager(self._kafka_group) | |
broker.commit_consumer_group_offsets( | |
self._kafka_group, 1, 'kombu', reqs | |
) | |
return size | |
def _delete(self, queue, *args, **kwargs): | |
"""Delete a queue/topic""" | |
# We will just let it go through. There is no API defined yet | |
# for deleting a queue/topic, need to be done through kafka itself | |
pass | |
def _size(self, queue): | |
"""Gets the number of pending messages in the topic/queue""" | |
queue = self.sanitize_queue_name(queue) | |
# Don't auto-create topics. | |
if queue not in self.client.topics: | |
return 0 | |
topic = self.client.topics[queue] | |
latest = self._get_offset(topic, OffsetType.LATEST) | |
earliest = self._get_offset(topic, OffsetType.EARLIEST) | |
return earliest + latest | |
def _get_offset(self, topic, offset_type): | |
""" | |
:type topic: pykafka.topic.Topic | |
:rtype: int | |
""" | |
if offset_type == OffsetType.LATEST: | |
return topic.latest_available_offsets()[0].offset[0] | |
elif offset_type == OffsetType.EARLIEST: | |
return topic.earliest_available_offsets()[0].offset[0] | |
raise TypeError('Invalid offset_type passed') | |
def _new_queue(self, queue, **kwargs): | |
"""Create a new queue if it does not exist""" | |
# Just create a producer, the queue will be created automatically | |
# Note: Please, please, please create the topic before hand, | |
# preferably with high replication factor and loads of partitions | |
queue = self.sanitize_queue_name(queue) | |
self._get_producer(queue) | |
# raise ValueError('No topic exists for %s' % queue) | |
def _has_queue(self, queue, **kwargs): | |
"""Check if a queue already exists""" | |
queue = self.sanitize_queue_name(queue) | |
client = self._open() | |
if queue in client.topics: | |
return True | |
else: | |
return False | |
def _open(self): | |
hosts = self._get_hosts() | |
client = KafkaClient( | |
zookeeper_hosts=hosts, | |
use_greenlets=False) | |
return client | |
def _get_hosts(self): | |
conninfo = self.connection.client | |
raw_hostname = conninfo.hostname | |
config = parse_url(raw_hostname) | |
parsed_hosts = [self._build_host_name(config)] | |
if conninfo.alt: | |
for alt_raw_hostname in conninfo.alt: | |
parsed_hosts.append(self._build_host_name(alt_raw_hostname)) | |
return ','.join(parsed_hosts) | |
@classmethod | |
def _build_host_name(cls, host_config): | |
assert host_config['transport'] == pykafka.__name__, "Invalid kafka transport specified" | |
assert host_config['hostname'], "Hostname missing for Kafka transport" | |
assert host_config['port'], "No port specified for Kafka transport" | |
return '{0}:{1}'.format(host_config['hostname'], int(host_config['port'])) | |
@property | |
def client(self): | |
if self._client is None: | |
self._client = self._open() | |
self._kafka_group = self.connection.client.virtual_host[0:-1] | |
# A kafka group must always be set. | |
if not self._kafka_group: | |
self._kafka_group = DEFAULT_KAFKA_GROUP | |
return self._client | |
def close(self): | |
super(Channel, self).close() | |
for producer in self._kafka_producers.itervalues(): | |
producer.stop() | |
self._kafka_producers = {} | |
for consumer in self._kafka_consumers.itervalues(): | |
consumer.stop() | |
self._kafka_consumers = {} | |
class Transport(virtual.Transport): | |
Channel = Channel | |
default_port = DEFAULT_PORT | |
can_parse_url = True | |
driver_type = 'kafka' | |
driver_name = 'pykafka' | |
def __init__(self, *args, **kwargs): | |
if pykafka is None: | |
raise ImportError('The pykafka library is not installed') | |
super(Transport, self).__init__(*args, **kwargs) | |
def driver_version(self): | |
return pykafka.__version__ | |
def establish_connection(self): | |
return super(Transport, self).establish_connection() | |
def close_connection(self, connection): | |
return super(Transport, self).close_connection(connection) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment