Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save MahdiKarimipour/9f50380d09fc37339047315779a125a3 to your computer and use it in GitHub Desktop.
Save MahdiKarimipour/9f50380d09fc37339047315779a125a3 to your computer and use it in GitHub Desktop.
Kafka Message Consumer
public class KafkaHostedService : MyBackgroundService
{
private readonly ILogger<KafkaHostedService> logger;
private readonly AppSettings appSettings;
private readonly IConfiguration configuration;
private readonly IServiceScopeFactory scopeFactory;
public KafkaHostedService(
ILogger<KafkaHostedService> logger,
AppSettings appSettings,
IConfiguration configuration,
IServiceScopeFactory scopeFactory)
{
this.appSettings = appSettings;
this.logger = logger;
this.configuration = configuration;
this.scopeFactory = scopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
logger.LogInformation($"Kafka background task is starting.");
cancellationToken.Register(() =>
logger.LogInformation($" Kafka background task is stopping."));
await Task.Run(async () =>
{
while (true)
{
logger.LogInformation($"Kafka task is doing the work.");
var appSecrets = configuration.Get<AppSecrets>();
var consumerConfig = new ConsumerConfig
{
BootstrapServers = appSettings.KafkaSettings.Server,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = appSecrets.KafkaSaslKey,
SaslPassword = appSecrets.KafkaSaslSecret,
GroupId = "Ecosystem-MessagingApi"
};
var cb = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = cb.Build())
{
consumer.Subscribe(appSettings.KafkaSettings.SendEmailToCustomer);
var cr = consumer.Consume(cancellationToken);
if (!cr.Message.Value.IsEmpty())
{
//Here is the actual message
var emailRequest = JsonConvert.DeserializeObject<EmailRequest>(cr.Message.Value);
using (var scope = scopeFactory.CreateScope())
{
var commsService = scope.ServiceProvider.GetRequiredService<ICommsService>();
//Do the actual work
//i.e. Send the Email
}
}
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment