Last active
November 23, 2022 13:00
-
-
Save matteobertozzi/ce580033478faebd606271952bfdb7fe to your computer and use it in GitHub Desktop.
Apache Kafka Broker (Producer) Protocol
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # ---------------------------------------------------------------------- | |
| # Copyright 2022 Matteo Bertozzi | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from collections import namedtuple | |
| import asyncio | |
| import uuid | |
| import gzip | |
| import io | |
| # ================================================================================ | |
| # Encode/Decode related | |
| # ================================================================================ | |
| KafkaSchemaField = namedtuple('KafkaSchemaField', ['field_name', 'field_type', 'type_schema'], defaults=(None,)) | |
| class ObjectWithSchema: | |
| def __init__(self, schema, data): | |
| self.schema = schema | |
| self.data = data | |
| def __getattr__(self, name): | |
| return self.data[name] | |
| def __repr__(self) -> str: | |
| return repr(self.data) | |
| class KafkaBytesWriter: | |
| def __init__(self, buffer=None): | |
| self.buffer = buffer if buffer else io.BytesIO() | |
| def get_bytes(self): | |
| return self.buffer.getvalue() | |
| def get_memoryview(self): | |
| return self.buffer.getbuffer() | |
| def __repr__(self) -> str: | |
| return repr(self.buffer.getvalue()) | |
| def write_boolean(self, v): self.write_int(1 if v else 0, 1, False) | |
| def write_int8(self, v): self.write_int(v, 1, True) | |
| def write_int16(self, v): self.write_int(v, 2, True) | |
| def write_int32(self, v): self.write_int(v, 4, True) | |
| def write_int64(self, v): self.write_int(v, 8, True) | |
| def write_uint8(self, v): self.write_int(v, 1, False) | |
| def write_uint32(self, v): self.write_int(v, 4, False) | |
| def write_int(self, value, nbytes, signed): | |
| if value is None: value = 0 | |
| self.buffer.write(value.to_bytes(nbytes, byteorder='big', signed=signed)) | |
| def write_varint(self, value): | |
| self.write_uvarint((value << 1) ^ (value >> 31)) | |
| def write_varlong(self, value): | |
| self.write_uvarint((value << 1) ^ (value >> 63)) | |
| def write_uvarint(self, value): | |
| if value is None: value = 0 | |
| while (value & 0xffffff80) != 0: | |
| self.write_uint8((value & 0x7f) | 0x80) | |
| value >>= 7 | |
| self.write_uint8(value) | |
| def write_float64(self, value): | |
| raise NotImplementedError | |
| def write_uuid(self, uuid): | |
| self.buffer.write(uuid.bytes) | |
| def write_varint_string(self, value): | |
| if value is None: | |
| self.write_varint(0) | |
| else: | |
| data = value.encode('utf-8') | |
| self.write_varint(len(data)) | |
| self.buffer.write(data) | |
| def write_compact_string(self, value): | |
| if value is None: | |
| self.write_uvarint(0) | |
| else: | |
| data = value.encode('utf-8') | |
| self.write_uvarint(len(data) + 1) | |
| self.buffer.write(data) | |
| def write_string(self, value): | |
| data = value.encode('utf-8') | |
| self.write_int16(len(data)) | |
| self.buffer.write(data) | |
| def write_nullable_string(self, value): | |
| if value is None: | |
| self.write_uvarint(-1) | |
| else: | |
| data = value.encode('utf-8') | |
| self.write_int16(len(data)) | |
| self.buffer.write(data) | |
| def write_bytes(self, value): | |
| if value is None: | |
| self.write_int32(-1) | |
| else: | |
| self.write_int32(len(value)) | |
| self.buffer.write(value) | |
| def write_compact_bytes(self, value): | |
| if value is None: | |
| self.write_uvarint(0) | |
| else: | |
| self.write_uvarint(len(value) + 1) | |
| def write_varint_bytes(self, value): | |
| if value is None: | |
| self.write_varint(0) | |
| else: | |
| self.write_varint(len(value)) | |
| self.buffer.write(value) | |
| def write_tag_buffer(self, tags): | |
| if not tags: | |
| self.write_uvarint(0) | |
| return | |
| ntags = len(tags) | |
| self.write_uvarint(ntags) | |
| for k, v in tags.items(): | |
| self.write_uvarint(k) | |
| self.write_uvarint(len(v)) | |
| self.buffer.write(v) | |
| def write_type(self, type_name, value): | |
| match type_name: | |
| case 'BOOLEAN': return self.write_boolean(value) | |
| case 'INT8': return self.write_int8(value) | |
| case 'INT16': return self.write_int16(value) | |
| case 'INT32': return self.write_int32(value) | |
| case 'INT64': return self.write_int64(value) | |
| case 'UINT32': return self.write_uint32(value) | |
| case 'VARINT': return self.write_varint(value) | |
| case 'VARLONG': return self.write_varlong(value) | |
| case 'UUID': return self.write_uuid(value) | |
| case 'FLOAT64': return self.write_float64(value) | |
| case 'STRING': return self.write_string(value) | |
| case 'COMPACT_STRING': return self.write_compact_string(value) | |
| case 'NULLABLE_STRING': return self.write_nullable_string(value) | |
| case 'COMPACT_NULLABLE_STRING': return self.write_compact_string(value) | |
| case 'BYTES': return self.write_bytes(value) | |
| case 'VARINT_BYTES': return self.write_varint_bytes(value) | |
| case 'COMPACT_BYTES': return self.write_compact_bytes(value) | |
| case 'NULLABLE_BYTES': return self.write_bytes(value) | |
| case 'COMPACT_NULLABLE_BYTES': return self.write_compact_bytes(value) | |
| case 'TAG_BUFFER': return self.write_tag_buffer(value) | |
| case _: raise NotImplementedError('unsupported type %s' % type_name) | |
| def write_array(self, schema, items): | |
| if items is None: | |
| self.write_int32(-1) | |
| elif len(schema) == 1 and schema[0].field_name is None: | |
| item_type = schema[0].field_type | |
| self.write_int32(len(items)) | |
| for v in items: | |
| self.write_type(item_type, v) | |
| else: | |
| self.write_int32(len(items)) | |
| for v in items: | |
| self.write_schema(schema, v) | |
| def write_compact_array(self, schema, items): | |
| if items is None: | |
| self.write_uvarint(0) | |
| elif len(schema) == 1 and schema[0].field_name is None: | |
| item_type = schema[0].field_type | |
| self.write_uvarint(len(items) + 1) | |
| for v in items: | |
| self.write_type(item_type, v) | |
| else: | |
| self.write_uvarint(len(items) + 1) | |
| for v in items: | |
| self.write_schema(schema, v) | |
| def write_schema(self, schema, value): | |
| for field_name, field_type, sub_schema in schema: | |
| field_value = value.get(field_name) | |
| #print('write', field_name, field_type, field_value) | |
| match field_type: | |
| case 'ARRAY': self.write_array(sub_schema, field_value) | |
| case 'COMPACT_ARRAY': self.write_compact_array(sub_schema, field_value) | |
| case 'RECORDS': | |
| raise NotImplementedError | |
| case _: | |
| self.write_type(field_type, field_value) | |
| class KafkaBytesReader: | |
| def __init__(self, buffer): | |
| self.buffer = io.BytesIO(buffer) | |
| def __repr__(self) -> str: | |
| return repr(self.buffer.getvalue()) | |
| def read_remaining(self): | |
| return self.buffer.read() | |
| def read_boolean(self): return self.read_int(1, False) != 0 | |
| def read_int8(self): return self.read_int(1, True) | |
| def read_int16(self): return self.read_int(2, True) | |
| def read_int32(self): return self.read_int(4, True) | |
| def read_int64(self): return self.read_int(8, True) | |
| def read_uint8(self): return self.read_int(1, False) | |
| def read_uint32(self): return self.read_int(4, False) | |
| def read_int(self, nbytes, signed): | |
| return int.from_bytes(self.buffer.read(nbytes), byteorder='big', signed=signed) | |
| def read_varint(self): | |
| n = self.read_uvarint() | |
| return (n >> 1) ^ -(n & 1) | |
| def read_uvarint(self): | |
| shift = 0 | |
| result = 0 | |
| while True: | |
| v = self.read_uint8() | |
| result |= (v & 0x7f) << shift | |
| shift += 7 | |
| if not (v & 0x80): | |
| break | |
| return result | |
| def read_uuid(self): | |
| return uuid.UUID(bytes=self.buffer.read(16)) | |
| def read_float64(self): | |
| raise NotImplementedError | |
| def read_nullable_string(self): | |
| length = self.read_int16() | |
| if length < 0: return None | |
| return self.buffer.read(length).decode('utf-8') | |
| def read_varint_string(self): | |
| length = self.read_varint() | |
| if length < 0: return None | |
| return self.buffer.read(length).decode('utf-8') | |
| def read_compact_string(self): | |
| length = self.read_uvarint() - 1 | |
| if length < 0: return None | |
| return self.buffer.read(length).decode('utf-8') | |
| def read_bytes(self, value): | |
| length = self.read_int32() | |
| if length < 0: return None | |
| return self.buffer.read(length) | |
| def read_compact_bytes(self): | |
| length = self.read_uvarint() - 1 | |
| if length < 0: return None | |
| return self.buffer.read(length) | |
| def read_varint_bytes(self): | |
| length = self.read_varint() | |
| if length < 0: return None | |
| return self.buffer.read(length) | |
| def read_tag_buffer(self): | |
| ntags = self.read_uvarint() | |
| tags = {} | |
| for i in range(ntags): | |
| tag_id = self.read_uvarint() | |
| length = self.read_uvarint() | |
| tags[tag_id] = self.read_bytes(length) | |
| return tags | |
| def read_type(self, type_name): | |
| match type_name: | |
| case 'BOOLEAN': return self.read_boolean() | |
| case 'INT8': return self.read_int8() | |
| case 'INT16': return self.read_int16() | |
| case 'INT32': return self.read_int32() | |
| case 'INT64': return self.read_int64() | |
| case 'UINT32': return self.read_uint32() | |
| case 'VARINT': return self.read_varint() | |
| case 'VARLONG': return self.read_varint() | |
| case 'UUID': return self.read_uuid() | |
| case 'FLOAT64': return self.read_float64() | |
| case 'STRING': return self.read_nullable_string() | |
| case 'VARINT_STRING': return self.read_varint_string() | |
| case 'COMPACT_STRING': return self.read_compact_string() | |
| case 'NULLABLE_STRING': return self.read_nullable_string() | |
| case 'COMPACT_NULLABLE_STRING': return self.read_compact_string() | |
| case 'BYTES': return self.read_bytes() | |
| case 'VARINT_BYTES': return self.read_varint_bytes() | |
| case 'COMPACT_BYTES': return self.read_compact_bytes() | |
| case 'NULLABLE_BYTES': return self.read_bytes() | |
| case 'COMPACT_NULLABLE_BYTES': return self.read_compact_bytes() | |
| case 'TAG_BUFFER': return self.read_tag_buffer() | |
| case _: raise NotImplementedError('unsupported type %s' % type_name) | |
| def read_array(self, schema): | |
| nitems = self.read_int32() | |
| if nitems < 0: return None | |
| if len(schema) == 1 and schema[0].field_name is None: | |
| item_type = schema[0].field_type | |
| return [self.read_type(item_type) for _ in range(nitems)] | |
| return [self.read_schema(schema) for v in range(nitems)] | |
| def read_compact_array(self, schema): | |
| nitems = self.read_uvarint() - 1 | |
| if nitems < 0: return None | |
| if len(schema) == 1 and schema[0].field_name is None: | |
| item_type = schema[0].field_type | |
| return [self.read_type(item_type) for _ in range(nitems)] | |
| return [self.read_schema(schema) for _ in range(nitems)] | |
| def read_schema(self, schema): | |
| result = {} | |
| for field_name, field_type, sub_schema in schema: | |
| #print('read', field_name, field_type) | |
| match field_type: | |
| case 'ARRAY': result[field_name] = self.read_array(sub_schema) | |
| case 'COMPACT_ARRAY': result[field_name] = self.read_compact_array(sub_schema) | |
| case 'RECORDS': raise NotImplementedError | |
| case _: result[field_name] = self.read_type(field_type) | |
| return ObjectWithSchema(schema, result) | |
| # ================================================================================ | |
| # Protocol definitions | |
| # NOTE: don't do this. put all the defs in a json file or similar | |
| # and use a structure with apiKey, version, schema. something like: | |
| # { apiKey: { version: { request: SCHEMA, response: SCHEMA } } | |
| # ================================================================================ | |
| REQUEST_HEADER_V2 = ( | |
| KafkaSchemaField('request_api_key', 'INT16'), | |
| KafkaSchemaField('request_api_version', 'INT16'), | |
| KafkaSchemaField('correlation_id', 'INT32'), | |
| KafkaSchemaField('client_id', 'NULLABLE_STRING'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER') | |
| ) | |
| RESPONSE_HEADER_V0 = ( | |
| KafkaSchemaField('correlation_id', 'INT32'), | |
| ) | |
| RESPONSE_HEADER_V1 = ( | |
| KafkaSchemaField('correlation_id', 'INT32'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER') | |
| ) | |
| API_VERSION_REQUEST_V3 = ( | |
| KafkaSchemaField('client_software_name', 'COMPACT_STRING'), | |
| KafkaSchemaField('client_software_version', 'COMPACT_STRING'), | |
| ) | |
| API_VERSION_RESPONSE_V3 = ( | |
| KafkaSchemaField('error_code', 'INT16'), | |
| KafkaSchemaField('api_keys', 'ARRAY', ( | |
| KafkaSchemaField('api_key', 'INT16'), | |
| KafkaSchemaField('min_version', 'INT16'), | |
| KafkaSchemaField('max_version', 'INT16'), | |
| )), | |
| KafkaSchemaField('throttle_time_ms', 'INT32'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER') | |
| ) | |
| METADATA_REQUEST_V12 = ( | |
| KafkaSchemaField('topics', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('topic_id', 'UUID'), | |
| KafkaSchemaField('name', 'COMPACT_STRING'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER') | |
| )), | |
| KafkaSchemaField('allow_auto_topic_creation', 'BOOLEAN'), | |
| KafkaSchemaField('include_topic_authorized_operations', 'BOOLEAN'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| ) | |
| METADATA_RESPONSE_V12 = ( | |
| KafkaSchemaField('throttle_time_ms', 'INT32'), | |
| KafkaSchemaField('brokers', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('node_id', 'INT32'), | |
| KafkaSchemaField('host', 'COMPACT_STRING'), | |
| KafkaSchemaField('port', 'INT32'), | |
| KafkaSchemaField('rack', 'COMPACT_NULLABLE_STRING'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('cluster_id', 'COMPACT_NULLABLE_STRING'), | |
| KafkaSchemaField('controller_id', 'INT32'), | |
| KafkaSchemaField('topics', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('error_code', 'INT16'), | |
| KafkaSchemaField('name', 'COMPACT_STRING'), | |
| KafkaSchemaField('topic_id', 'UUID'), | |
| KafkaSchemaField('is_internal', 'BOOLEAN'), | |
| KafkaSchemaField('partitions', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('error_code', 'INT16'), | |
| KafkaSchemaField('partition_index', 'INT32'), | |
| KafkaSchemaField('leader_id', 'INT32'), | |
| KafkaSchemaField('leader_epoch', 'INT32'), | |
| KafkaSchemaField('replica_nodes', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField(None, 'INT32'), | |
| )), | |
| KafkaSchemaField('isr_nodes', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField(None, 'INT32'), | |
| )), | |
| KafkaSchemaField('offline_replicas', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField(None, 'INT32'), | |
| )), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('topic_authorized_operations', 'INT32'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| ) | |
| INIT_PRODUCER_ID_REQUEST_V4 = ( | |
| KafkaSchemaField('transactional_id', 'COMPACT_NULLABLE_STRING'), | |
| KafkaSchemaField('transaction_timeout_ms', 'INT32'), | |
| KafkaSchemaField('producer_id', 'INT64'), | |
| KafkaSchemaField('producer_epoch', 'INT16'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| ) | |
| INIT_PRODUCER_ID_RESPONSE_V4 = ( | |
| KafkaSchemaField('throttle_time_ms', 'INT32'), | |
| KafkaSchemaField('error_code', 'INT16'), | |
| KafkaSchemaField('producer_id', 'INT64'), | |
| KafkaSchemaField('producer_epoch', 'INT16'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| ) | |
| PRODUCE_REQUEST_V9 = ( | |
| KafkaSchemaField('transactional_id', 'COMPACT_NULLABLE_STRING'), | |
| KafkaSchemaField('acks', 'INT16'), | |
| KafkaSchemaField('timeout_ms', 'INT32'), | |
| KafkaSchemaField('topic_data', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('name', 'COMPACT_STRING'), | |
| KafkaSchemaField('partition_data', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('index', 'INT32'), | |
| KafkaSchemaField('records', 'COMPACT_NULLABLE_BYTES'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| ) | |
| PRODUCE_RESPONSE_V9 = ( | |
| KafkaSchemaField('responses', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('name', 'COMPACT_STRING'), | |
| KafkaSchemaField('partition_responses', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('index', 'INT32'), | |
| KafkaSchemaField('error_code', 'INT16'), | |
| KafkaSchemaField('base_offset', 'INT64'), | |
| KafkaSchemaField('log_append_time_ms', 'INT64'), | |
| KafkaSchemaField('log_start_offset', 'INT64'), | |
| KafkaSchemaField('record_errors', 'COMPACT_ARRAY', ( | |
| KafkaSchemaField('batch_index', 'INT32'), | |
| KafkaSchemaField('batch_index_error_message', 'COMPACT_NULLABLE_STRING'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('error_message', 'COMPACT_NULLABLE_STRING'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| )), | |
| KafkaSchemaField('throttle_time_ms', 'INT32'), | |
| KafkaSchemaField('tagged_fields', 'TAG_BUFFER'), | |
| ) | |
| RECORD_BATCH = ( | |
| KafkaSchemaField('base_offset', 'INT64'), | |
| KafkaSchemaField('base_length', 'INT32'), | |
| KafkaSchemaField('partition_leader_epoch', 'INT32'), | |
| KafkaSchemaField('magic', 'INT8'), | |
| KafkaSchemaField('crc', 'INT32'), | |
| KafkaSchemaField('attributes', 'INT16'), | |
| KafkaSchemaField('last_offset_delta', 'INT32'), | |
| KafkaSchemaField('base_timestamp', 'INT64'), | |
| KafkaSchemaField('max_timestamp', 'INT64'), | |
| KafkaSchemaField('producer_id', 'INT64'), | |
| KafkaSchemaField('producer_epoch', 'INT16'), | |
| KafkaSchemaField('base_sequence', 'INT32'), | |
| KafkaSchemaField('record_count', 'INT32'), | |
| ) | |
| RECORD = ( | |
| KafkaSchemaField('length', 'VARINT'), | |
| KafkaSchemaField('attributes', 'INT8'), | |
| KafkaSchemaField('timestamp_delta', 'VARLONG'), | |
| KafkaSchemaField('offset_delta', 'VARINT'), | |
| KafkaSchemaField('key', 'VARINT_BYTES'), | |
| KafkaSchemaField('value', 'VARINT_BYTES'), | |
| KafkaSchemaField('header_count', 'VARINT'), | |
| ) | |
| RECORD_HEADER = ( | |
| KafkaSchemaField('key', 'VARINT_STRING'), | |
| KafkaSchemaField('value', 'VARINT_BYTES'), | |
| ) | |
| COMPRESSION_TYPE = ['none', 'gzip', 'snappy', 'lz4', 'zstd'] | |
| def read_record_batch(data): | |
| #print('-' * 80) | |
| #print(data) | |
| reader = KafkaBytesReader(data) | |
| record_batch = reader.read_schema(RECORD_BATCH) | |
| record_batch.data['compression'] = record_batch.attributes & 7 | |
| record_batch.data['timestamp_type'] = (record_batch.attributes >> 3) & 1 | |
| record_batch.data['is_transactional'] = ((record_batch.attributes >> 4) & 1) != 0 | |
| record_batch.data['is_control_batch'] = ((record_batch.attributes >> 5) & 1) != 0 | |
| record_batch.data['has_delete_horizion_ms'] = ((record_batch.attributes >> 6) & 1) != 0 | |
| match record_batch.compression: | |
| case 1: reader = KafkaBytesReader(gzip.decompress(reader.read_remaining())) | |
| #case 2: reader = KafkaBytesReader(snappy.decompress(reader.buffer.getbuffer())) | |
| #case 3: reader = KafkaBytesReader(lz4.decompress(reader.buffer.getbuffer())) | |
| #case 4: reader = KafkaBytesReader(zstd.decompress(reader.read_remaining())) | |
| records = [] | |
| for _ in range(record_batch.record_count): | |
| record = reader.read_schema(RECORD) | |
| headers = [reader.read_schema(RECORD_HEADER) for _ in range(record.header_count)] | |
| record.data['headers'] = headers | |
| records.append(record) | |
| record_batch.data['records'] = records | |
| return record_batch | |
| # ================================================================================ | |
| # Network related | |
| # ================================================================================ | |
| async def read_frame(reader): | |
| head = await reader.readexactly(4) | |
| if not head: | |
| raise EOFError('connection closed') | |
| length = int.from_bytes(head, 'big') | |
| frame = await reader.readexactly(length) | |
| return KafkaBytesReader(frame) | |
| async def write_frame(writer, data): | |
| writer.write(len(data).to_bytes(4, 'big')) | |
| writer.write(data) | |
| #print('write frame', data) | |
| await writer.drain() | |
| async def kafka_handle(reader, writer): | |
| try: | |
| print('connected') | |
| while True: | |
| frame = await read_frame(reader) | |
| #print('frame received', frame) | |
| req_header = frame.read_schema(REQUEST_HEADER_V2) | |
| print('-> request', req_header) | |
| match req_header.request_api_key: | |
| case 18: # API Versions | |
| request = frame.read_schema(API_VERSION_REQUEST_V3) | |
| print(' -> API Versions', request) | |
| # inform the client about the api versions supported by the server | |
| wbuffer = KafkaBytesWriter() | |
| wbuffer.write_schema(RESPONSE_HEADER_V0, {'correlation_id': req_header.correlation_id}) | |
| wbuffer.write_schema(API_VERSION_RESPONSE_V3, { | |
| 'error_code': 0, | |
| 'api_keys': [ | |
| {'api_key': 18, 'min_version': 3, 'max_version': 3}, | |
| {'api_key': 3, 'min_version': 12, 'max_version': 12}, | |
| # producer | |
| {'api_key': 22, 'min_version': 4, 'max_version': 4}, | |
| {'api_key': 0, 'min_version': 9, 'max_version': 9}, | |
| ], | |
| 'throttle_time_ms': 123, | |
| }) | |
| await write_frame(writer, wbuffer.get_bytes()) | |
| case 3: # Metadata | |
| metadata = frame.read_schema(METADATA_REQUEST_V12) | |
| print(' -> Metadata', metadata) | |
| # single node information | |
| topics = [] | |
| for topic_data in metadata.topics: | |
| topics.append({ | |
| 'error_code': 0, | |
| 'name': topic_data.name, | |
| 'topic_id': topic_data.topic_id, | |
| 'is_internal': 0, | |
| 'partitions': [{ | |
| 'error_code': 0, | |
| 'partition_index': 0, | |
| 'leader_id': 0, | |
| 'leader_epoch': 1, | |
| 'replica_nodes': [0], | |
| 'isr_nodes': [0], | |
| 'offline_replicas': [], | |
| }], | |
| 'topic_authorized_operations': -2147483648, | |
| }) | |
| wbuffer = KafkaBytesWriter() | |
| wbuffer.write_schema(RESPONSE_HEADER_V1, {'correlation_id': req_header.correlation_id}) | |
| wbuffer.write_schema(METADATA_RESPONSE_V12, { | |
| 'throttle_time_ms': 0, | |
| 'brokers': [ | |
| { 'node_id': 0, 'host': '127.0.0.1', 'port': 9092, 'rack': None } | |
| ], | |
| 'cluster_id': 'test-cluster', | |
| 'controller_id': 0, | |
| 'topics': topics, | |
| }) | |
| await write_frame(writer, wbuffer.get_bytes()) | |
| case 22: # Init Producer Id | |
| init_producer_id = frame.read_schema(INIT_PRODUCER_ID_REQUEST_V4) | |
| print(' -> init prducer id', init_producer_id) | |
| wbuffer = KafkaBytesWriter() | |
| wbuffer.write_schema(RESPONSE_HEADER_V1, {'correlation_id': req_header.correlation_id}) | |
| wbuffer.write_schema(INIT_PRODUCER_ID_RESPONSE_V4, { | |
| 'throttle_time_ms': 0, | |
| 'error_code': 0, | |
| 'producer_id': 1, | |
| 'producer_epoch': 0, | |
| }) | |
| await write_frame(writer, wbuffer.get_bytes()) | |
| case 0: # Produce | |
| produce = frame.read_schema(PRODUCE_REQUEST_V9) | |
| print(' -> produce', produce) | |
| responses = [] | |
| for topic_data in produce.topic_data: | |
| print(' ---> produce topic data', topic_data.name) | |
| partition_responses = [] | |
| responses.append({'name': topic_data.name, 'partition_responses': partition_responses}) | |
| for partition_data in topic_data.partition_data: | |
| record_batch = read_record_batch(partition_data.records) | |
| print(record_batch) | |
| for r in record_batch.records: | |
| # Use the record for your purpose | |
| print(' -----> RECORD', r) | |
| partition_responses.append({ | |
| 'index': partition_data.index, | |
| 'error_code': 0, | |
| 'base_offset': 0, | |
| 'log_append_time_ms': -1, | |
| 'log_start_offset': -1, | |
| 'record_errors': [], | |
| 'error_message': None, | |
| }) | |
| wbuffer = KafkaBytesWriter() | |
| wbuffer.write_schema(RESPONSE_HEADER_V1, {'correlation_id': req_header.correlation_id}) | |
| wbuffer.write_schema(PRODUCE_RESPONSE_V9, { 'throttle_time_ms': 0, 'responses': responses }) | |
| await write_frame(writer, wbuffer.get_bytes()) | |
| case _: | |
| raise NotImplementedError | |
| except EOFError: | |
| pass | |
| except Exception as e: | |
| print(e.with_traceback()) | |
| finally: | |
| print('Close the connection') | |
| writer.close() | |
| async def main(): | |
| server = await asyncio.start_server(kafka_handle, '127.0.0.1', 9092) | |
| addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) | |
| print(f'Kafka Broker listening on {addrs}') | |
| async with server: | |
| await server.serve_forever() | |
| if __name__ == '__main__': | |
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.net.InetAddress; | |
| import java.time.Duration; | |
| import java.util.List; | |
| import java.util.Properties; | |
| import org.apache.kafka.clients.producer.KafkaProducer; | |
| import org.apache.kafka.clients.producer.ProducerRecord; | |
| public class DemoKafkaProducer { | |
| public static void main(final String[] args) throws Exception { | |
| final Properties config = new Properties(); | |
| config.put("client.id", InetAddress.getLocalHost().getHostName()); | |
| config.put("bootstrap.servers", "127.0.0.1:9092"); | |
| config.put("compression.type", "none"); | |
| config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
| config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
| try (KafkaProducer<String, String> producer = new KafkaProducer<>(config)) { | |
| final ProducerRecord<String, String> p = new ProducerRecord<>("xtopic", "kay1", "val1"); | |
| p.headers().add("he1", "heval1".getBytes()); | |
| p.headers().add("he2", "heval2".getBytes()); | |
| producer.send(p); | |
| producer.send(new ProducerRecord<>("xtopic", "kay2", "val2")); | |
| producer.flush(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment