Skip to content

Instantly share code, notes, and snippets.

@matteobertozzi
Last active November 23, 2022 13:00
Show Gist options
  • Select an option

  • Save matteobertozzi/ce580033478faebd606271952bfdb7fe to your computer and use it in GitHub Desktop.

Select an option

Save matteobertozzi/ce580033478faebd606271952bfdb7fe to your computer and use it in GitHub Desktop.
Apache Kafka Broker (Producer) Protocol
#!/usr/bin/env python
# ----------------------------------------------------------------------
# Copyright 2022 Matteo Bertozzi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import asyncio
import uuid
import gzip
import io
# ================================================================================
# Encode/Decode related
# ================================================================================
KafkaSchemaField = namedtuple('KafkaSchemaField', ['field_name', 'field_type', 'type_schema'], defaults=(None,))
class ObjectWithSchema:
def __init__(self, schema, data):
self.schema = schema
self.data = data
def __getattr__(self, name):
return self.data[name]
def __repr__(self) -> str:
return repr(self.data)
class KafkaBytesWriter:
def __init__(self, buffer=None):
self.buffer = buffer if buffer else io.BytesIO()
def get_bytes(self):
return self.buffer.getvalue()
def get_memoryview(self):
return self.buffer.getbuffer()
def __repr__(self) -> str:
return repr(self.buffer.getvalue())
def write_boolean(self, v): self.write_int(1 if v else 0, 1, False)
def write_int8(self, v): self.write_int(v, 1, True)
def write_int16(self, v): self.write_int(v, 2, True)
def write_int32(self, v): self.write_int(v, 4, True)
def write_int64(self, v): self.write_int(v, 8, True)
def write_uint8(self, v): self.write_int(v, 1, False)
def write_uint32(self, v): self.write_int(v, 4, False)
def write_int(self, value, nbytes, signed):
if value is None: value = 0
self.buffer.write(value.to_bytes(nbytes, byteorder='big', signed=signed))
def write_varint(self, value):
self.write_uvarint((value << 1) ^ (value >> 31))
def write_varlong(self, value):
self.write_uvarint((value << 1) ^ (value >> 63))
def write_uvarint(self, value):
if value is None: value = 0
while (value & 0xffffff80) != 0:
self.write_uint8((value & 0x7f) | 0x80)
value >>= 7
self.write_uint8(value)
def write_float64(self, value):
raise NotImplementedError
def write_uuid(self, uuid):
self.buffer.write(uuid.bytes)
def write_varint_string(self, value):
if value is None:
self.write_varint(0)
else:
data = value.encode('utf-8')
self.write_varint(len(data))
self.buffer.write(data)
def write_compact_string(self, value):
if value is None:
self.write_uvarint(0)
else:
data = value.encode('utf-8')
self.write_uvarint(len(data) + 1)
self.buffer.write(data)
def write_string(self, value):
data = value.encode('utf-8')
self.write_int16(len(data))
self.buffer.write(data)
def write_nullable_string(self, value):
if value is None:
self.write_uvarint(-1)
else:
data = value.encode('utf-8')
self.write_int16(len(data))
self.buffer.write(data)
def write_bytes(self, value):
if value is None:
self.write_int32(-1)
else:
self.write_int32(len(value))
self.buffer.write(value)
def write_compact_bytes(self, value):
if value is None:
self.write_uvarint(0)
else:
self.write_uvarint(len(value) + 1)
def write_varint_bytes(self, value):
if value is None:
self.write_varint(0)
else:
self.write_varint(len(value))
self.buffer.write(value)
def write_tag_buffer(self, tags):
if not tags:
self.write_uvarint(0)
return
ntags = len(tags)
self.write_uvarint(ntags)
for k, v in tags.items():
self.write_uvarint(k)
self.write_uvarint(len(v))
self.buffer.write(v)
def write_type(self, type_name, value):
match type_name:
case 'BOOLEAN': return self.write_boolean(value)
case 'INT8': return self.write_int8(value)
case 'INT16': return self.write_int16(value)
case 'INT32': return self.write_int32(value)
case 'INT64': return self.write_int64(value)
case 'UINT32': return self.write_uint32(value)
case 'VARINT': return self.write_varint(value)
case 'VARLONG': return self.write_varlong(value)
case 'UUID': return self.write_uuid(value)
case 'FLOAT64': return self.write_float64(value)
case 'STRING': return self.write_string(value)
case 'COMPACT_STRING': return self.write_compact_string(value)
case 'NULLABLE_STRING': return self.write_nullable_string(value)
case 'COMPACT_NULLABLE_STRING': return self.write_compact_string(value)
case 'BYTES': return self.write_bytes(value)
case 'VARINT_BYTES': return self.write_varint_bytes(value)
case 'COMPACT_BYTES': return self.write_compact_bytes(value)
case 'NULLABLE_BYTES': return self.write_bytes(value)
case 'COMPACT_NULLABLE_BYTES': return self.write_compact_bytes(value)
case 'TAG_BUFFER': return self.write_tag_buffer(value)
case _: raise NotImplementedError('unsupported type %s' % type_name)
def write_array(self, schema, items):
if items is None:
self.write_int32(-1)
elif len(schema) == 1 and schema[0].field_name is None:
item_type = schema[0].field_type
self.write_int32(len(items))
for v in items:
self.write_type(item_type, v)
else:
self.write_int32(len(items))
for v in items:
self.write_schema(schema, v)
def write_compact_array(self, schema, items):
if items is None:
self.write_uvarint(0)
elif len(schema) == 1 and schema[0].field_name is None:
item_type = schema[0].field_type
self.write_uvarint(len(items) + 1)
for v in items:
self.write_type(item_type, v)
else:
self.write_uvarint(len(items) + 1)
for v in items:
self.write_schema(schema, v)
def write_schema(self, schema, value):
for field_name, field_type, sub_schema in schema:
field_value = value.get(field_name)
#print('write', field_name, field_type, field_value)
match field_type:
case 'ARRAY': self.write_array(sub_schema, field_value)
case 'COMPACT_ARRAY': self.write_compact_array(sub_schema, field_value)
case 'RECORDS':
raise NotImplementedError
case _:
self.write_type(field_type, field_value)
class KafkaBytesReader:
def __init__(self, buffer):
self.buffer = io.BytesIO(buffer)
def __repr__(self) -> str:
return repr(self.buffer.getvalue())
def read_remaining(self):
return self.buffer.read()
def read_boolean(self): return self.read_int(1, False) != 0
def read_int8(self): return self.read_int(1, True)
def read_int16(self): return self.read_int(2, True)
def read_int32(self): return self.read_int(4, True)
def read_int64(self): return self.read_int(8, True)
def read_uint8(self): return self.read_int(1, False)
def read_uint32(self): return self.read_int(4, False)
def read_int(self, nbytes, signed):
return int.from_bytes(self.buffer.read(nbytes), byteorder='big', signed=signed)
def read_varint(self):
n = self.read_uvarint()
return (n >> 1) ^ -(n & 1)
def read_uvarint(self):
shift = 0
result = 0
while True:
v = self.read_uint8()
result |= (v & 0x7f) << shift
shift += 7
if not (v & 0x80):
break
return result
def read_uuid(self):
return uuid.UUID(bytes=self.buffer.read(16))
def read_float64(self):
raise NotImplementedError
def read_nullable_string(self):
length = self.read_int16()
if length < 0: return None
return self.buffer.read(length).decode('utf-8')
def read_varint_string(self):
length = self.read_varint()
if length < 0: return None
return self.buffer.read(length).decode('utf-8')
def read_compact_string(self):
length = self.read_uvarint() - 1
if length < 0: return None
return self.buffer.read(length).decode('utf-8')
def read_bytes(self, value):
length = self.read_int32()
if length < 0: return None
return self.buffer.read(length)
def read_compact_bytes(self):
length = self.read_uvarint() - 1
if length < 0: return None
return self.buffer.read(length)
def read_varint_bytes(self):
length = self.read_varint()
if length < 0: return None
return self.buffer.read(length)
def read_tag_buffer(self):
ntags = self.read_uvarint()
tags = {}
for i in range(ntags):
tag_id = self.read_uvarint()
length = self.read_uvarint()
tags[tag_id] = self.read_bytes(length)
return tags
def read_type(self, type_name):
match type_name:
case 'BOOLEAN': return self.read_boolean()
case 'INT8': return self.read_int8()
case 'INT16': return self.read_int16()
case 'INT32': return self.read_int32()
case 'INT64': return self.read_int64()
case 'UINT32': return self.read_uint32()
case 'VARINT': return self.read_varint()
case 'VARLONG': return self.read_varint()
case 'UUID': return self.read_uuid()
case 'FLOAT64': return self.read_float64()
case 'STRING': return self.read_nullable_string()
case 'VARINT_STRING': return self.read_varint_string()
case 'COMPACT_STRING': return self.read_compact_string()
case 'NULLABLE_STRING': return self.read_nullable_string()
case 'COMPACT_NULLABLE_STRING': return self.read_compact_string()
case 'BYTES': return self.read_bytes()
case 'VARINT_BYTES': return self.read_varint_bytes()
case 'COMPACT_BYTES': return self.read_compact_bytes()
case 'NULLABLE_BYTES': return self.read_bytes()
case 'COMPACT_NULLABLE_BYTES': return self.read_compact_bytes()
case 'TAG_BUFFER': return self.read_tag_buffer()
case _: raise NotImplementedError('unsupported type %s' % type_name)
def read_array(self, schema):
nitems = self.read_int32()
if nitems < 0: return None
if len(schema) == 1 and schema[0].field_name is None:
item_type = schema[0].field_type
return [self.read_type(item_type) for _ in range(nitems)]
return [self.read_schema(schema) for v in range(nitems)]
def read_compact_array(self, schema):
nitems = self.read_uvarint() - 1
if nitems < 0: return None
if len(schema) == 1 and schema[0].field_name is None:
item_type = schema[0].field_type
return [self.read_type(item_type) for _ in range(nitems)]
return [self.read_schema(schema) for _ in range(nitems)]
def read_schema(self, schema):
result = {}
for field_name, field_type, sub_schema in schema:
#print('read', field_name, field_type)
match field_type:
case 'ARRAY': result[field_name] = self.read_array(sub_schema)
case 'COMPACT_ARRAY': result[field_name] = self.read_compact_array(sub_schema)
case 'RECORDS': raise NotImplementedError
case _: result[field_name] = self.read_type(field_type)
return ObjectWithSchema(schema, result)
# ================================================================================
# Protocol definitions
# NOTE: don't do this. put all the defs in a json file or similar
# and use a structure with apiKey, version, schema. something like:
# { apiKey: { version: { request: SCHEMA, response: SCHEMA } }
# ================================================================================
REQUEST_HEADER_V2 = (
KafkaSchemaField('request_api_key', 'INT16'),
KafkaSchemaField('request_api_version', 'INT16'),
KafkaSchemaField('correlation_id', 'INT32'),
KafkaSchemaField('client_id', 'NULLABLE_STRING'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER')
)
RESPONSE_HEADER_V0 = (
KafkaSchemaField('correlation_id', 'INT32'),
)
RESPONSE_HEADER_V1 = (
KafkaSchemaField('correlation_id', 'INT32'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER')
)
API_VERSION_REQUEST_V3 = (
KafkaSchemaField('client_software_name', 'COMPACT_STRING'),
KafkaSchemaField('client_software_version', 'COMPACT_STRING'),
)
API_VERSION_RESPONSE_V3 = (
KafkaSchemaField('error_code', 'INT16'),
KafkaSchemaField('api_keys', 'ARRAY', (
KafkaSchemaField('api_key', 'INT16'),
KafkaSchemaField('min_version', 'INT16'),
KafkaSchemaField('max_version', 'INT16'),
)),
KafkaSchemaField('throttle_time_ms', 'INT32'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER')
)
METADATA_REQUEST_V12 = (
KafkaSchemaField('topics', 'COMPACT_ARRAY', (
KafkaSchemaField('topic_id', 'UUID'),
KafkaSchemaField('name', 'COMPACT_STRING'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER')
)),
KafkaSchemaField('allow_auto_topic_creation', 'BOOLEAN'),
KafkaSchemaField('include_topic_authorized_operations', 'BOOLEAN'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)
METADATA_RESPONSE_V12 = (
KafkaSchemaField('throttle_time_ms', 'INT32'),
KafkaSchemaField('brokers', 'COMPACT_ARRAY', (
KafkaSchemaField('node_id', 'INT32'),
KafkaSchemaField('host', 'COMPACT_STRING'),
KafkaSchemaField('port', 'INT32'),
KafkaSchemaField('rack', 'COMPACT_NULLABLE_STRING'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('cluster_id', 'COMPACT_NULLABLE_STRING'),
KafkaSchemaField('controller_id', 'INT32'),
KafkaSchemaField('topics', 'COMPACT_ARRAY', (
KafkaSchemaField('error_code', 'INT16'),
KafkaSchemaField('name', 'COMPACT_STRING'),
KafkaSchemaField('topic_id', 'UUID'),
KafkaSchemaField('is_internal', 'BOOLEAN'),
KafkaSchemaField('partitions', 'COMPACT_ARRAY', (
KafkaSchemaField('error_code', 'INT16'),
KafkaSchemaField('partition_index', 'INT32'),
KafkaSchemaField('leader_id', 'INT32'),
KafkaSchemaField('leader_epoch', 'INT32'),
KafkaSchemaField('replica_nodes', 'COMPACT_ARRAY', (
KafkaSchemaField(None, 'INT32'),
)),
KafkaSchemaField('isr_nodes', 'COMPACT_ARRAY', (
KafkaSchemaField(None, 'INT32'),
)),
KafkaSchemaField('offline_replicas', 'COMPACT_ARRAY', (
KafkaSchemaField(None, 'INT32'),
)),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('topic_authorized_operations', 'INT32'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)
INIT_PRODUCER_ID_REQUEST_V4 = (
KafkaSchemaField('transactional_id', 'COMPACT_NULLABLE_STRING'),
KafkaSchemaField('transaction_timeout_ms', 'INT32'),
KafkaSchemaField('producer_id', 'INT64'),
KafkaSchemaField('producer_epoch', 'INT16'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)
INIT_PRODUCER_ID_RESPONSE_V4 = (
KafkaSchemaField('throttle_time_ms', 'INT32'),
KafkaSchemaField('error_code', 'INT16'),
KafkaSchemaField('producer_id', 'INT64'),
KafkaSchemaField('producer_epoch', 'INT16'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)
PRODUCE_REQUEST_V9 = (
KafkaSchemaField('transactional_id', 'COMPACT_NULLABLE_STRING'),
KafkaSchemaField('acks', 'INT16'),
KafkaSchemaField('timeout_ms', 'INT32'),
KafkaSchemaField('topic_data', 'COMPACT_ARRAY', (
KafkaSchemaField('name', 'COMPACT_STRING'),
KafkaSchemaField('partition_data', 'COMPACT_ARRAY', (
KafkaSchemaField('index', 'INT32'),
KafkaSchemaField('records', 'COMPACT_NULLABLE_BYTES'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)
PRODUCE_RESPONSE_V9 = (
KafkaSchemaField('responses', 'COMPACT_ARRAY', (
KafkaSchemaField('name', 'COMPACT_STRING'),
KafkaSchemaField('partition_responses', 'COMPACT_ARRAY', (
KafkaSchemaField('index', 'INT32'),
KafkaSchemaField('error_code', 'INT16'),
KafkaSchemaField('base_offset', 'INT64'),
KafkaSchemaField('log_append_time_ms', 'INT64'),
KafkaSchemaField('log_start_offset', 'INT64'),
KafkaSchemaField('record_errors', 'COMPACT_ARRAY', (
KafkaSchemaField('batch_index', 'INT32'),
KafkaSchemaField('batch_index_error_message', 'COMPACT_NULLABLE_STRING'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('error_message', 'COMPACT_NULLABLE_STRING'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)),
KafkaSchemaField('throttle_time_ms', 'INT32'),
KafkaSchemaField('tagged_fields', 'TAG_BUFFER'),
)
RECORD_BATCH = (
KafkaSchemaField('base_offset', 'INT64'),
KafkaSchemaField('base_length', 'INT32'),
KafkaSchemaField('partition_leader_epoch', 'INT32'),
KafkaSchemaField('magic', 'INT8'),
KafkaSchemaField('crc', 'INT32'),
KafkaSchemaField('attributes', 'INT16'),
KafkaSchemaField('last_offset_delta', 'INT32'),
KafkaSchemaField('base_timestamp', 'INT64'),
KafkaSchemaField('max_timestamp', 'INT64'),
KafkaSchemaField('producer_id', 'INT64'),
KafkaSchemaField('producer_epoch', 'INT16'),
KafkaSchemaField('base_sequence', 'INT32'),
KafkaSchemaField('record_count', 'INT32'),
)
RECORD = (
KafkaSchemaField('length', 'VARINT'),
KafkaSchemaField('attributes', 'INT8'),
KafkaSchemaField('timestamp_delta', 'VARLONG'),
KafkaSchemaField('offset_delta', 'VARINT'),
KafkaSchemaField('key', 'VARINT_BYTES'),
KafkaSchemaField('value', 'VARINT_BYTES'),
KafkaSchemaField('header_count', 'VARINT'),
)
RECORD_HEADER = (
KafkaSchemaField('key', 'VARINT_STRING'),
KafkaSchemaField('value', 'VARINT_BYTES'),
)
COMPRESSION_TYPE = ['none', 'gzip', 'snappy', 'lz4', 'zstd']
def read_record_batch(data):
#print('-' * 80)
#print(data)
reader = KafkaBytesReader(data)
record_batch = reader.read_schema(RECORD_BATCH)
record_batch.data['compression'] = record_batch.attributes & 7
record_batch.data['timestamp_type'] = (record_batch.attributes >> 3) & 1
record_batch.data['is_transactional'] = ((record_batch.attributes >> 4) & 1) != 0
record_batch.data['is_control_batch'] = ((record_batch.attributes >> 5) & 1) != 0
record_batch.data['has_delete_horizion_ms'] = ((record_batch.attributes >> 6) & 1) != 0
match record_batch.compression:
case 1: reader = KafkaBytesReader(gzip.decompress(reader.read_remaining()))
#case 2: reader = KafkaBytesReader(snappy.decompress(reader.buffer.getbuffer()))
#case 3: reader = KafkaBytesReader(lz4.decompress(reader.buffer.getbuffer()))
#case 4: reader = KafkaBytesReader(zstd.decompress(reader.read_remaining()))
records = []
for _ in range(record_batch.record_count):
record = reader.read_schema(RECORD)
headers = [reader.read_schema(RECORD_HEADER) for _ in range(record.header_count)]
record.data['headers'] = headers
records.append(record)
record_batch.data['records'] = records
return record_batch
# ================================================================================
# Network related
# ================================================================================
async def read_frame(reader):
head = await reader.readexactly(4)
if not head:
raise EOFError('connection closed')
length = int.from_bytes(head, 'big')
frame = await reader.readexactly(length)
return KafkaBytesReader(frame)
async def write_frame(writer, data):
writer.write(len(data).to_bytes(4, 'big'))
writer.write(data)
#print('write frame', data)
await writer.drain()
async def kafka_handle(reader, writer):
try:
print('connected')
while True:
frame = await read_frame(reader)
#print('frame received', frame)
req_header = frame.read_schema(REQUEST_HEADER_V2)
print('-> request', req_header)
match req_header.request_api_key:
case 18: # API Versions
request = frame.read_schema(API_VERSION_REQUEST_V3)
print(' -> API Versions', request)
# inform the client about the api versions supported by the server
wbuffer = KafkaBytesWriter()
wbuffer.write_schema(RESPONSE_HEADER_V0, {'correlation_id': req_header.correlation_id})
wbuffer.write_schema(API_VERSION_RESPONSE_V3, {
'error_code': 0,
'api_keys': [
{'api_key': 18, 'min_version': 3, 'max_version': 3},
{'api_key': 3, 'min_version': 12, 'max_version': 12},
# producer
{'api_key': 22, 'min_version': 4, 'max_version': 4},
{'api_key': 0, 'min_version': 9, 'max_version': 9},
],
'throttle_time_ms': 123,
})
await write_frame(writer, wbuffer.get_bytes())
case 3: # Metadata
metadata = frame.read_schema(METADATA_REQUEST_V12)
print(' -> Metadata', metadata)
# single node information
topics = []
for topic_data in metadata.topics:
topics.append({
'error_code': 0,
'name': topic_data.name,
'topic_id': topic_data.topic_id,
'is_internal': 0,
'partitions': [{
'error_code': 0,
'partition_index': 0,
'leader_id': 0,
'leader_epoch': 1,
'replica_nodes': [0],
'isr_nodes': [0],
'offline_replicas': [],
}],
'topic_authorized_operations': -2147483648,
})
wbuffer = KafkaBytesWriter()
wbuffer.write_schema(RESPONSE_HEADER_V1, {'correlation_id': req_header.correlation_id})
wbuffer.write_schema(METADATA_RESPONSE_V12, {
'throttle_time_ms': 0,
'brokers': [
{ 'node_id': 0, 'host': '127.0.0.1', 'port': 9092, 'rack': None }
],
'cluster_id': 'test-cluster',
'controller_id': 0,
'topics': topics,
})
await write_frame(writer, wbuffer.get_bytes())
case 22: # Init Producer Id
init_producer_id = frame.read_schema(INIT_PRODUCER_ID_REQUEST_V4)
print(' -> init prducer id', init_producer_id)
wbuffer = KafkaBytesWriter()
wbuffer.write_schema(RESPONSE_HEADER_V1, {'correlation_id': req_header.correlation_id})
wbuffer.write_schema(INIT_PRODUCER_ID_RESPONSE_V4, {
'throttle_time_ms': 0,
'error_code': 0,
'producer_id': 1,
'producer_epoch': 0,
})
await write_frame(writer, wbuffer.get_bytes())
case 0: # Produce
produce = frame.read_schema(PRODUCE_REQUEST_V9)
print(' -> produce', produce)
responses = []
for topic_data in produce.topic_data:
print(' ---> produce topic data', topic_data.name)
partition_responses = []
responses.append({'name': topic_data.name, 'partition_responses': partition_responses})
for partition_data in topic_data.partition_data:
record_batch = read_record_batch(partition_data.records)
print(record_batch)
for r in record_batch.records:
# Use the record for your purpose
print(' -----> RECORD', r)
partition_responses.append({
'index': partition_data.index,
'error_code': 0,
'base_offset': 0,
'log_append_time_ms': -1,
'log_start_offset': -1,
'record_errors': [],
'error_message': None,
})
wbuffer = KafkaBytesWriter()
wbuffer.write_schema(RESPONSE_HEADER_V1, {'correlation_id': req_header.correlation_id})
wbuffer.write_schema(PRODUCE_RESPONSE_V9, { 'throttle_time_ms': 0, 'responses': responses })
await write_frame(writer, wbuffer.get_bytes())
case _:
raise NotImplementedError
except EOFError:
pass
except Exception as e:
print(e.with_traceback())
finally:
print('Close the connection')
writer.close()
async def main():
server = await asyncio.start_server(kafka_handle, '127.0.0.1', 9092)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Kafka Broker listening on {addrs}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
import java.net.InetAddress;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class DemoKafkaProducer {
public static void main(final String[] args) throws Exception {
final Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "127.0.0.1:9092");
config.put("compression.type", "none");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(config)) {
final ProducerRecord<String, String> p = new ProducerRecord<>("xtopic", "kay1", "val1");
p.headers().add("he1", "heval1".getBytes());
p.headers().add("he2", "heval2".getBytes());
producer.send(p);
producer.send(new ProducerRecord<>("xtopic", "kay2", "val2"));
producer.flush();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment