Last active
March 26, 2018 03:48
-
-
Save aramkoukia/609bdb67a4b7b6200a79e5846c49f1f8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Confluent.Kafka; | |
| using Confluent.Kafka.Serialization; | |
| using System; | |
| using System.Collections.Generic; | |
| using System.Text; | |
| namespace kafka_consumer | |
| { | |
| class Program | |
| { | |
| static void Main(string[] args) | |
| { | |
| // The Kafka endpoint address | |
| string kafkaEndpoint = "your_kafka_endpoint_from_cluster-dashboard"; | |
| // The Kafka topic we'll be using | |
| string kafkaTopic = "testtopic"; | |
| // Create the consumer configuration | |
| var consumerConfig = new Dictionary<string, object> | |
| { | |
| { "group.id", "myconsumer" }, | |
| { "bootstrap.servers", kafkaEndpoint }, | |
| }; | |
| // Create the consumer | |
| using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8))) | |
| { | |
| // Subscribe to the OnMessage event | |
| consumer.OnMessage += (obj, msg) => | |
| { | |
| Console.WriteLine($"Received: {msg.Value}"); | |
| }; | |
| // Subscribe to the Kafka topic | |
| consumer.Subscribe(new List<string>() { kafkaTopic }); | |
| // Handle Cancel Keypress | |
| var cancelled = false; | |
| Console.CancelKeyPress += (_, e) => | |
| { | |
| e.Cancel = true; // prevent the process from terminating. | |
| cancelled = true; | |
| }; | |
| Console.WriteLine("Ctrl-C to exit."); | |
| // Poll for messages | |
| while (!cancelled) | |
| { | |
| consumer.Poll(TimeSpan.FromMinutes(1)); | |
| } | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment