Skip to content

Instantly share code, notes, and snippets.

@aramkoukia
Last active March 26, 2018 03:48
Show Gist options
  • Select an option

  • Save aramkoukia/609bdb67a4b7b6200a79e5846c49f1f8 to your computer and use it in GitHub Desktop.

Select an option

Save aramkoukia/609bdb67a4b7b6200a79e5846c49f1f8 to your computer and use it in GitHub Desktop.
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
namespace kafka_consumer
{
class Program
{
static void Main(string[] args)
{
// The Kafka endpoint address
string kafkaEndpoint = "your_kafka_endpoint_from_cluster-dashboard";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the consumer configuration
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "myconsumer" },
{ "bootstrap.servers", kafkaEndpoint },
};
// Create the consumer
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
// Subscribe to the OnMessage event
consumer.OnMessage += (obj, msg) =>
{
Console.WriteLine($"Received: {msg.Value}");
};
// Subscribe to the Kafka topic
consumer.Subscribe(new List<string>() { kafkaTopic });
// Handle Cancel Keypress
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
// Poll for messages
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMinutes(1));
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment