Skip to content

Instantly share code, notes, and snippets.

@vasilkosturski
Created November 7, 2022 06:22
Show Gist options
  • Save vasilkosturski/73452b09d672fd8f09700c61ec98cc40 to your computer and use it in GitHub Desktop.
Save vasilkosturski/73452b09d672fd8f09700c61ec98cc40 to your computer and use it in GitHub Desktop.
Source code for the blog post "Kafka Networking via Wireshark"
using Confluent.Kafka;
using Confluent.Kafka.Admin;
namespace KafkaConnectivity;
public static class Program
{
private const int PartitionsCount = 2;
private const string TopicName = "my_topic";
private const string BootstrapServers = "localhost:9092,localhost:9093";
public static async Task Main(string[] args)
{
await CreateKafkaTopic();
var config = new ProducerConfig
{
BootstrapServers = BootstrapServers
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
await ProduceMessage(producer, "First message", 0);
await ProduceMessage(producer, "Second message", 1);
}
var consumerConfig = new ConsumerConfig
{
BootstrapServers = BootstrapServers,
GroupId = Guid.NewGuid().ToString(),
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
using (var consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build())
{
consumer.Subscribe(new[] { TopicName });
var eofPartitions = 0;
while (true)
{
var consumeResult = consumer.Consume();
if (consumeResult.IsPartitionEOF)
{
eofPartitions++;
if (eofPartitions == PartitionsCount)
break;
continue;
}
Console.WriteLine("Consumed Message:");
Console.WriteLine($"Value: {consumeResult.Message.Value}. " +
$"Partition: {consumeResult.Partition}. " +
$"Offset: {consumeResult.Offset}.");
}
consumer.Close();
}
}
private static async Task ProduceMessage(IProducer<Null, string> producer, string value, int partition)
{
var produceResult = await producer.ProduceAsync(new TopicPartition(TopicName, new Partition(partition)),
new Message<Null, string> { Value = value });
Console.WriteLine("Produced Message:");
Console.WriteLine($"Value: {produceResult.Message.Value}. " +
$"Partition: {produceResult.Partition}. " +
$"Offset: {produceResult.Offset}.");
}
private static async Task CreateKafkaTopic()
{
using var adminClient =
new AdminClientBuilder(new AdminClientConfig { BootstrapServers = BootstrapServers }).Build();
await adminClient.CreateTopicsAsync(new TopicSpecification[]
{
new() { Name = TopicName, NumPartitions = PartitionsCount }
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment