Created
December 4, 2020 08:36
-
-
Save TsuyoshiUshio/30049c785f5d14e267be7989e5b78598 to your computer and use it in GitHub Desktop.
Kafka Extension Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"version": "2.0", | |
"logging": { | |
"logLevel": { | |
"Function.MyFunction": "Information", | |
"default": "Information" | |
} | |
}, | |
"extensions": { | |
"kafka": { | |
"maxBatchSize": 2, | |
"AutoCommitIntervalMs": 200, | |
"SubscriberIntervalInSeconds": 3, | |
"ExecutorChannelCapacity": 1, | |
"ChannelFullRetryIntervalInMs": 50, | |
"MaxPollIntervalMs": 300000, | |
"LibkafkaDebug": "fetch" | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Avro; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Azure.WebJobs.Extensions.Kafka; | |
using Microsoft.Extensions.Logging; | |
using System; | |
using System.Collections.Generic; | |
using System.Runtime.CompilerServices; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace Kafka300 | |
{ | |
public class SimpleKafkaTriggers | |
{ | |
[FunctionName(nameof(SampleConsumerAsync))] | |
public async Task SampleConsumerAsync( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"SampleConsumerAsync partition: {kafkaEvent.Partition} : read 1 message"); | |
} | |
[FunctionName(nameof(SampleConsumerAsync1))] | |
public async Task SampleConsumerAsync1( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"Count: {kafkaEvents.Length}"); | |
foreach (var kafkaEvent in kafkaEvents) | |
{ | |
// logger.LogInformation($"partition: {kafkaEvent.Partition} :{kafkaEvent.Value.ToString()}"); | |
logger.LogInformation($"SampleConsumerAsync partition: {kafkaEvent.Partition} : read 1 message"); | |
} | |
} | |
[FunctionName(nameof(SampleConsumerAsync2))] | |
public async Task SampleConsumerAsync2( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string, string>[] kafkaEvents, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"Count: {kafkaEvents.Length}"); | |
foreach (var kafkaEvent in kafkaEvents) | |
{ | |
logger.LogInformation($"SampleConsumerAsync2 partition: {kafkaEvent.Partition} :{kafkaEvent.Key} : : read 1 message"); // You can call kafkaEvent.Value | |
} | |
} | |
[FunctionName(nameof(SampleConsumerAsync3))] | |
public async Task SampleConsumerAsync3( | |
[KafkaTrigger( | |
"BrokerList", | |
"reproduce2", | |
Username = "%ConfluentCloudUsername%", | |
Password = "%ConfluentCloudPassword%", | |
Protocol = BrokerProtocol.SaslSsl, | |
SslCaLocation = "confluent_cloud_cacert.pem", | |
ConsumerGroup = "$Default", | |
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents, | |
ILogger logger) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(2)); | |
logger.LogInformation($"Count: {kafkaEvents.Length}"); | |
foreach (var kafkaEvent in kafkaEvents) | |
{ | |
logger.LogInformation($"SampleConsumerAsync3 partition: {kafkaEvent.Partition} : read 1 message"); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey Rajan,
For the retrieving the username, password and BrokerList from the local.settings.json you would need to set it up like this:
Then in your local.settings.json you would have it like this:
When published to Azure, you would need to add those same key/value settings pairs on the Azure portal -> Function App -> Configuration -> App Settings.
Vinny