Skip to content

Instantly share code, notes, and snippets.

@mhowlett
Last active December 7, 2016 21:28

Revisions

  1. mhowlett revised this gist Dec 7, 2016. No changes.
  2. mhowlett created this gist Dec 7, 2016.
    90 changes: 90 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,90 @@
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using RdKafka;

    namespace AdvancedConsumer
    {
    public class Program
    {
    public static void Run(string brokerList, List<string> topics)
    {
    bool enableAutoCommit = false;

    var config = new Config()
    {
    GroupId = "advanced-csharp-consumer",
    EnableAutoCommit = enableAutoCommit,
    StatisticsInterval = TimeSpan.FromSeconds(60)
    };

    using (var consumer = new EventConsumer(config, brokerList))
    {
    consumer.OnMessage += (obj, msg) => {
    string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
    Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");

    if (!enableAutoCommit && msg.Offset % 10 == 0)
    {
    Console.WriteLine($"Committing offset");
    consumer.Commit(msg).Wait();
    Console.WriteLine($"Committed offset");
    }
    };

    consumer.OnConsumerError += (obj, errorCode) =>
    {
    Console.WriteLine($"Consumer Error: {errorCode}");
    };

    consumer.OnEndReached += (obj, end) => {
    Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
    };

    consumer.OnError += (obj, error) => {
    Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
    };

    if (enableAutoCommit)
    {
    consumer.OnOffsetCommit += (obj, commit) => {
    if (commit.Error != ErrorCode.NO_ERROR)
    {
    Console.WriteLine($"Failed to commit offsets: {commit.Error}");
    }
    Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
    };
    }

    consumer.OnPartitionsAssigned += (obj, partitions) => {
    Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
    consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p.Topic, P.Partition, 2)).ToList());
    };

    consumer.OnPartitionsRevoked += (obj, partitions) => {
    Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
    consumer.Unassign();
    };

    consumer.OnStatistics += (obj, json) => {
    Console.WriteLine($"Statistics: {json}");
    };

    consumer.Subscribe(topics);
    consumer.Start();

    Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
    Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");

    Console.WriteLine($"Started consumer, press enter to stop consuming");
    Console.ReadLine();
    }
    }

    public static void Main(string[] args)
    {
    Run(args[0], args.Skip(1).ToList());
    }
    }
    }