Created
December 4, 2020 08:36
-
-
Save TsuyoshiUshio/30049c785f5d14e267be7989e5b78598 to your computer and use it in GitHub Desktop.
Kafka Extension Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"version": "2.0", | |
"logging": { | |
"logLevel": { | |
"Function.MyFunction": "Information", | |
"default": "Information" | |
} | |
}, | |
"extensions": { | |
"kafka": { | |
"maxBatchSize": 2, | |
"AutoCommitIntervalMs": 200, | |
"SubscriberIntervalInSeconds": 3, | |
"ExecutorChannelCapacity": 1, | |
"ChannelFullRetryIntervalInMs": 50, | |
"MaxPollIntervalMs": 300000, | |
"LibkafkaDebug": "fetch" | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Avro; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Azure.WebJobs.Extensions.Kafka; | |
using Microsoft.Extensions.Logging; | |
using System; | |
using System.Collections.Generic; | |
using System.Runtime.CompilerServices; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace Kafka300 | |
{ | |
public class SimpleKafkaTriggers | |
{ | |
[FunctionName(nameof(SampleConsumerAsync))] | |
public async Task SampleConsumerAsync( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"SampleConsumerAsync partition: {kafkaEvent.Partition} : read 1 message"); | |
} | |
[FunctionName(nameof(SampleConsumerAsync1))] | |
public async Task SampleConsumerAsync1( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"Count: {kafkaEvents.Length}"); | |
foreach (var kafkaEvent in kafkaEvents) | |
{ | |
// logger.LogInformation($"partition: {kafkaEvent.Partition} :{kafkaEvent.Value.ToString()}"); | |
logger.LogInformation($"SampleConsumerAsync partition: {kafkaEvent.Partition} : read 1 message"); | |
} | |
} | |
[FunctionName(nameof(SampleConsumerAsync2))] | |
public async Task SampleConsumerAsync2( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string, string>[] kafkaEvents, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"Count: {kafkaEvents.Length}"); | |
foreach (var kafkaEvent in kafkaEvents) | |
{ | |
logger.LogInformation($"SampleConsumerAsync2 partition: {kafkaEvent.Partition} :{kafkaEvent.Key} : : read 1 message"); // You can call kafkaEvent.Value | |
} | |
} | |
[FunctionName(nameof(SampleConsumerAsync3))] | |
public async Task SampleConsumerAsync3( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"Count: {kafkaEvents.Length}"); | |
foreach (var kafkaEvent in kafkaEvents) | |
{ | |
logger.LogInformation($"SampleConsumerAsync3 partition: {kafkaEvent.Partition} : read 1 message"); | |
} | |
} | |
} | |
} |
Hi Tsuyoshi Ushio,
Could you please provide example about how to retrieve username and password from configuration? I have tried using host.json and local.settings.json at local machine. I could not get it work. An example would be great.
Also, can I take configure borkerList in the config as well. The brokerList will be different for dev, acceptance and Prod.
Thanks, Rajan
Hey Rajan,
For the retrieving the username, password and BrokerList from the local.settings.json you would need to set it up like this:
public async Task Run(
[KafkaTrigger("BrokerList",
"KafkaTriggerTopic",
Username = "KafkaUser",
Password = "KafkaPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events,
ILogger log)
Then in your local.settings.json you would have it like this:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"KafkaUser": "<CLUSTER_API_KEY>",
"BrokerList": "<BROKER_LIST>",
"KafkaPassword": "<CLUSTER_API_SECRET >"
}
}
When published to Azure, you would need to add those same key/value settings pairs on the Azure portal -> Function App -> Configuration -> App Settings.
Vinny
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Tsuyoshi Ushio,
Could you please provide example about how to retrieve username and password from configuration? I have tried using host.json and local.settings.json at local machine. I could not get it work. An example would be great.
Also, can I take configure borkerList in the config as well. The brokerList will be different for dev, acceptance and Prod.
Thanks,
Rajan