Skip to content

Instantly share code, notes, and snippets.

@siddharth96
Created January 19, 2017 10:04
Show Gist options
  • Save siddharth96/2acce2ac7d75b7cf565d0d028f038e56 to your computer and use it in GitHub Desktop.
Save siddharth96/2acce2ac7d75b7cf565d0d028f038e56 to your computer and use it in GitHub Desktop.
Celery based Kafka consumer
"""
kombu.transport.kafka
=====================
Kafka transport.
:copyright: (c) 2010 - 2013 by Mahendra M.
:license: BSD, see LICENSE for more details.
**Synopsis**
Connects to kafka (0.8.x+) as <server>:<port>/<vhost>
The <vhost> becomes the group for all the clients. So we can use
it like a vhost
It is recommended that the queue be created in advance, by specifying the
number of partitions. The partition configuration determines how many
consumers can fetch data in parallel
**Limitations**
* The client API needs to modified to fetch data only from a single
partition. This can be used for effective load balancing also.
"""
from __future__ import absolute_import
import calendar
import datetime
import time
import pykafka
from anyjson import loads, dumps
from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils.compat import OrderedDict
from kombu.utils.url import parse_url
from pykafka import KafkaClient
from pykafka.protocol import PartitionOffsetCommitRequest
from pykafka.common import OffsetType
KAFKA_CONNECTION_ERRORS = ()
KAFKA_CHANNEL_ERRORS = ()
DEFAULT_PORT = 9092
DEFAULT_KAFKA_GROUP = 'kombu-consumer-group'
class QoS(object):
def __init__(self, channel):
self.prefetch_count = 1
self._channel = channel
self._not_yet_acked = OrderedDict()
def can_consume(self):
"""Returns True if the :class:`Channel` can consume more messages, else
False.
:returns: True, if this QoS object can accept a message.
:rtype: bool
"""
return not self.prefetch_count or len(self._not_yet_acked) < self \
.prefetch_count
def can_consume_max_estimate(self):
if self.prefetch_count:
return self.prefetch_count - len(self._not_yet_acked)
else:
return 1
def append(self, message, delivery_tag):
self._not_yet_acked[delivery_tag] = message
def get(self, delivery_tag):
return self._not_yet_acked[delivery_tag]
def ack(self, delivery_tag):
message = self._not_yet_acked.pop(delivery_tag)
# exchange = message.properties['delivery_info']['exchange']
consumer = self._channel._get_consumer(self._get_queue(message))
consumer.commit_offsets()
def reject(self, delivery_tag, requeue=False):
"""Reject a message by delivery tag.
If requeue is True, then the last consumed message is reverted so it'll
be refetched on the next attempt. If False, that message is consumed and
ignored.
In order to revert a message, it's first acknowledged and then re-queued
"""
if requeue:
message = self._not_yet_acked.get(delivery_tag)
self.on_requeue(message, delivery_tag)
self.ack(delivery_tag)
def on_requeue(self, message, delivery_tag):
"""
This approach is risky and reset_offsets is also not recommended by pykafka team.
Hence, overriding this in BBQoS class
:type message: kombu.transport.virtual.Message
:type delivery_tag: str
:return:
"""
# exchange = message.properties['delivery_info']['exchange']
consumer = self._channel._get_consumer(self._get_queue(message))
# type consumer: pykafka.balancedconsumer.BalancedConsumer
partition_id = message.headers['partition_id']
partition = consumer.partitions[partition_id]
last_offset_consumed = consumer.held_offsets[partition_id]
consumer.reset_offsets(((partition, last_offset_consumed),))
def restore_unacked_once(self):
pass
def _get_queue(self, message):
q = None
# Read queue name from channel
if self._channel._active_queues and len(self._channel._active_queues) == 1:
q = self._channel._active_queues[0]
# If channel is bound to multiple queues then try reading the queue
# from the message using a 'jugaad'
elif 'args' in message.payload:
try:
# Try reading queue from passed in thread-locals
# Refer "shared_context" variable in bbcelery/queue.py -> _queueable_impl()
# i.e., message.payload['args'][1] = "shared_context" in the above method
q = message.payload['args'][1]['queue']
except (IndexError, KeyError):
q = None
if not q:
# Since routing-key is same as queue right now
q = message.properties['delivery_info']['routing_key']
return q
class Channel(virtual.Channel):
QoS = QoS
_client = None
_kafka_group = None
def __init__(self, *args, **kwargs):
super(Channel, self).__init__(*args, **kwargs)
self._kafka_consumers = {}
self._kafka_producers = {}
def exchange_unbind(self, destination, source='', routing_key='', nowait=False,
arguments=None):
"""
Not applicable for Kafka
:param destination:
:param source:
:param routing_key:
:param nowait:
:param arguments:
:return:
"""
pass
def queue_unbind(self, queue, exchange=None, routing_key='', arguments=None, **kwargs):
"""
Not applicable for Kafka
:param queue:
:param exchange:
:param routing_key:
:param arguments:
:param kwargs:
:return:
"""
pass
def exchange_bind(self, destination, source='', routing_key='', nowait=False, arguments=None):
"""
Not applicable for Kafka
:param destination:
:param source:
:param routing_key:
:param nowait:
:param arguments:
:return:
"""
pass
def flow(self, active=True):
"""
Not applicable for Kafka
:param active:
:return:
"""
pass
def fetch_offsets(self, client, topic, offset):
"""Fetch raw offset data from a topic.
note: stolen from the pykafka cli
:param client: KafkaClient connected to the cluster.
:type client: :class:`pykafka.KafkaClient`
:param topic: Name of the topic.
:type topic: :class:`pykafka.topic.Topic`
:param offset: Offset to reset to. Can be earliest, latest or a
datetime. Using a datetime will reset the offset to the latest
message published *before* the datetime.
:type offset: :class:`pykafka.common.OffsetType` or
:class:`datetime.datetime`
:returns: {
partition_id: :class:`pykafka.protocol.OffsetPartitionResponse`
}
"""
if offset.lower() == 'earliest':
return topic.earliest_available_offsets()
elif offset.lower() == 'latest':
return topic.latest_available_offsets()
else:
offset = datetime.datetime.strptime(offset, "%Y-%m-%dT%H:%M:%S")
offset = int(calendar.timegm(offset.utctimetuple()) * 1000)
return topic.fetch_offset_limits(offset)
def sanitize_queue_name(self, queue):
"""Need to sanitize the queue name, celery sometimes pushes in @
signs"""
return str(queue).replace('@', '')
def _get_producer(self, queue):
"""Create/get a producer instance for the given topic/queue"""
queue = self.sanitize_queue_name(queue)
producer = self._kafka_producers.get(queue, None)
if producer is None:
producer = self.client.topics[queue]\
.get_producer(use_rdkafka=True, min_queued_messages=1)
self._kafka_producers[queue] = producer
return producer
def _get_consumer(self, queue):
"""Create/get a consumer instance for the given topic/queue"""
queue = self.sanitize_queue_name(queue)
consumer = self._kafka_consumers.get(queue, None)
if consumer is None:
topic = self.client.topics[queue]
# consumer = topic.get_simple_consumer(
consumer = topic.get_balanced_consumer(
consumer_group=self._kafka_group,
use_rdkafka=True,
auto_commit_enable=False)
self._kafka_consumers[queue] = consumer
return consumer
def _put(self, queue, message, **kwargs):
"""Put a message on the topic/queue"""
queue = self.sanitize_queue_name(queue)
producer = self._get_producer(queue)
producer.produce(dumps(message))
def _get(self, queue, **kwargs):
"""Get a message from the topic/queue"""
consumer = self._get_consumer(queue)
message = consumer.consume(block=False)
if not message:
raise Empty()
return self._unmarshall(message)
def _unmarshall(self, message):
"""
:type message: pykafka.protocol.Message
:rtype: dict
"""
return loads(message.value)
def _purge(self, queue):
"""Purge all pending messages in the topic/queue, taken from the pykafka
cli
"""
queue = self.sanitize_queue_name(queue)
# Don't auto-create topics.
if queue not in self.client.topics:
return 0
topic = self.client.topics[queue]
size = self._size(queue)
# build offset commit requests
offsets = topic.latest_available_offsets()
tmsp = int(time.time() * 1000)
reqs = [PartitionOffsetCommitRequest(queue,
partition_id,
res.offset[0],
tmsp,
'kombu')
for partition_id, res in offsets.iteritems()]
# Send them to the appropriate broker.
broker = self.client.cluster.get_offset_manager(self._kafka_group)
broker.commit_consumer_group_offsets(
self._kafka_group, 1, 'kombu', reqs
)
return size
def _delete(self, queue, *args, **kwargs):
"""Delete a queue/topic"""
# We will just let it go through. There is no API defined yet
# for deleting a queue/topic, need to be done through kafka itself
pass
def _size(self, queue):
"""Gets the number of pending messages in the topic/queue"""
queue = self.sanitize_queue_name(queue)
# Don't auto-create topics.
if queue not in self.client.topics:
return 0
topic = self.client.topics[queue]
latest = self._get_offset(topic, OffsetType.LATEST)
earliest = self._get_offset(topic, OffsetType.EARLIEST)
return earliest + latest
def _get_offset(self, topic, offset_type):
"""
:type topic: pykafka.topic.Topic
:rtype: int
"""
if offset_type == OffsetType.LATEST:
return topic.latest_available_offsets()[0].offset[0]
elif offset_type == OffsetType.EARLIEST:
return topic.earliest_available_offsets()[0].offset[0]
raise TypeError('Invalid offset_type passed')
def _new_queue(self, queue, **kwargs):
"""Create a new queue if it does not exist"""
# Just create a producer, the queue will be created automatically
# Note: Please, please, please create the topic before hand,
# preferably with high replication factor and loads of partitions
queue = self.sanitize_queue_name(queue)
self._get_producer(queue)
# raise ValueError('No topic exists for %s' % queue)
def _has_queue(self, queue, **kwargs):
"""Check if a queue already exists"""
queue = self.sanitize_queue_name(queue)
client = self._open()
if queue in client.topics:
return True
else:
return False
def _open(self):
hosts = self._get_hosts()
client = KafkaClient(
zookeeper_hosts=hosts,
use_greenlets=False)
return client
def _get_hosts(self):
conninfo = self.connection.client
raw_hostname = conninfo.hostname
config = parse_url(raw_hostname)
parsed_hosts = [self._build_host_name(config)]
if conninfo.alt:
for alt_raw_hostname in conninfo.alt:
parsed_hosts.append(self._build_host_name(alt_raw_hostname))
return ','.join(parsed_hosts)
@classmethod
def _build_host_name(cls, host_config):
assert host_config['transport'] == pykafka.__name__, "Invalid kafka transport specified"
assert host_config['hostname'], "Hostname missing for Kafka transport"
assert host_config['port'], "No port specified for Kafka transport"
return '{0}:{1}'.format(host_config['hostname'], int(host_config['port']))
@property
def client(self):
if self._client is None:
self._client = self._open()
self._kafka_group = self.connection.client.virtual_host[0:-1]
# A kafka group must always be set.
if not self._kafka_group:
self._kafka_group = DEFAULT_KAFKA_GROUP
return self._client
def close(self):
super(Channel, self).close()
for producer in self._kafka_producers.itervalues():
producer.stop()
self._kafka_producers = {}
for consumer in self._kafka_consumers.itervalues():
consumer.stop()
self._kafka_consumers = {}
class Transport(virtual.Transport):
Channel = Channel
default_port = DEFAULT_PORT
can_parse_url = True
driver_type = 'kafka'
driver_name = 'pykafka'
def __init__(self, *args, **kwargs):
if pykafka is None:
raise ImportError('The pykafka library is not installed')
super(Transport, self).__init__(*args, **kwargs)
def driver_version(self):
return pykafka.__version__
def establish_connection(self):
return super(Transport, self).establish_connection()
def close_connection(self, connection):
return super(Transport, self).close_connection(connection)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment