using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using Microsoft.Extensions.Logging;
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Warning)
.AddFilter("System", LogLevel.Warning)
.AddFilter("NonHostConsoleApp.Program", LogLevel.Debug)
.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "HH:mm:ss ";
});
});
ILogger logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation("Starting");
static bool ValidateServerCertificate(
object sender,
X509Certificate certificate,
X509Chain chain,
SslPolicyErrors sslPolicyErrors)
{
Console.WriteLine(certificate.ToString());
if ((sslPolicyErrors == SslPolicyErrors.None)
|| (certificate.GetSerialNumberString() == "*********"))
{
Console.WriteLine("cert is an allowed self signed certificate");
return true;
}
// Do not allow communication with unauthorized servers.
return false;
}
var producerOptions = new EventHubProducerClientOptions();
producerOptions.ConnectionOptions.CertificateValidationCallback = ValidateServerCertificate;
try
{
//UseDevelopmentEmulator=true;
EventHubProducerClient producerClient = new EventHubProducerClient(
"Endpoint=sb://localhost;SharedAccessKeyName=foo;SharedAccessKey=bar;UseDevelopmentEmulator=true;", "foo-event-hub");
// Create a batch of events
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event Hello"))))
{
// if it is too large for the batch
throw new Exception($"Event is too large for the batch and cannot be sent.");
}
await producerClient.SendAsync(eventBatch);
logger.LogInformation("event published");
}
catch (Exception ex)
{
throw new Exception("Failed to send event", ex);
}
Last active
August 13, 2024 20:45
-
-
Save jrichardsz/a7d25a0492dff895b5a22ce578d9c33e to your computer and use it in GitHub Desktop.
azure event hub snippets
user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
stream {
upstream api_servers {
server 10.250.193.3:5672;
}
server {
listen 5671 ssl;
ssl_protocols TLSv1.2 TLSv1.1 TLSv1;
ssl_ciphers RC4:HIGH:!aNULL:!MD5;
ssl_handshake_timeout 30s;
ssl_certificate /etc/nginx/cert/nginx-selfsigned.crt;
ssl_certificate_key /etc/nginx/cert/nginx-selfsigned.key;
proxy_connect_timeout 1s;
proxy_pass api_servers;
}
}
nginx:
image: nginx:1.19.4
depends_on:
- azure-event-hub-mock
volumes:
- ./nginx/config/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/cert:/etc/nginx/cert
ports:
- '5671:5671'
networks:
- docker-mocks-starter
extra_hosts:
- "host.docker.internal:host-gateway"
azure-event-hub-mock:
container_name: azure-event-hub-mock
image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:1.2.4-preview-amd64"
volumes:
- "./.eventhub-emulator/config.json:/Eventhubs_Emulator/ConfigFiles/Config.json"
ports:
- "5672:5672"
environment:
BLOB_SERVER: azurite
METADATA_SERVER: azurite
ACCEPT_EULA: Y
depends_on:
- azurite
networks:
- docker-mocks-starter
extra_hosts:
- "host.docker.internal:host-gateway"
azurite:
image: mcr.microsoft.com/azure-storage/azurite
container_name: azurite
ports:
- "10000:10000"
- "10001:10001"
- "10002:10002"
environment:
TZ: America/Newyork
networks:
docker-mocks-starter:
aliases:
- "azurite"
extra_hosts:
- "host.docker.internal:host-gateway"
{
"UserConfig": {
"NamespaceConfig": [
{
"Type": "EventHub",
"Name": "emulatorNs1",
"Entities": [
{
"Name": "source",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "source-consumer-group"
}
]
},
{
"Name": "target",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "source-consumer-group"
}
]
},
{
"Name": "foo",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "bar"
}
]
},
{
"Name": "minimal-publisher",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "minimal-subscriber"
}
]
}
]
}
],
"LoggingConfig": {
"Type": "File"
}
}
}
azurite:
image: mcr.microsoft.com/azure-storage/azurite
container_name: azurite
ports:
- "10000:10000"
- "10001:10001"
- "10002:10002"
environment:
TZ: America/Newyork
networks:
docker-mocks-starter:
aliases:
- "azurite"
extra_hosts:
- "host.docker.internal:host-gateway"
azure-event-hub-mock:
container_name: azure-event-hub-mock
image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:1.2.4-preview-amd64"
volumes:
- "./.eventhub-emulator/config.json:/Eventhubs_Emulator/ConfigFiles/Config.json"
ports:
- "5672:5672"
environment:
BLOB_SERVER: azurite
METADATA_SERVER: azurite
ACCEPT_EULA: Y
depends_on:
- azurite
networks:
- docker-mocks-starter
extra_hosts:
- "host.docker.internal:host-gateway"
{
"UserConfig": {
"NamespaceConfig": [
{
"Type": "EventHub",
"Name": "emulatorNs1",
"Entities": [
{
"Name": "source",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "source-consumer-group"
}
]
},
{
"Name": "target",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "source-consumer-group"
}
]
},
{
"Name": "foo-event-hub",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "bar-event-hub"
}
]
}
]
}
],
"LoggingConfig": {
"Type": "File"
}
}
}
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using Microsoft.Extensions.Logging;
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Warning)
.AddFilter("System", LogLevel.Warning)
.AddFilter("NonHostConsoleApp.Program", LogLevel.Debug)
.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "HH:mm:ss ";
});
});
ILogger logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation("Starting");
static bool ValidateServerCertificate(
object sender,
X509Certificate certificate,
X509Chain chain,
SslPolicyErrors sslPolicyErrors)
{
Console.WriteLine(certificate.ToString());
if ((sslPolicyErrors == SslPolicyErrors.None)
|| (certificate.GetSerialNumberString() == "*********"))
{
Console.WriteLine("cert is an allowed self signed certificate");
return true;
}
// Do not allow communication with unauthorized servers.
return false;
}
var producerOptions = new EventHubProducerClientOptions();
producerOptions.ConnectionOptions.CertificateValidationCallback = ValidateServerCertificate;
try
{
//UseDevelopmentEmulator=true;
EventHubProducerClient producerClient = new EventHubProducerClient(
"Endpoint=sb://localhost;SharedAccessKeyName=foo;SharedAccessKey=bar;UseDevelopmentEmulator=true;", "foo");
// Create a batch of events
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event Hello"))))
{
// if it is too large for the batch
throw new Exception($"Event is too large for the batch and cannot be sent.");
}
await producerClient.SendAsync(eventBatch);
logger.LogInformation("event published");
}
catch (Exception ex)
{
throw new Exception("Failed to send event", ex);
}
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EventHubConsumerClient consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName); | |
string firstPartition = (await consumerClient.GetPartitionIdsAsync()).First(); | |
PartitionProperties properties = await consumerClient.GetPartitionPropertiesAsync(firstPartition); | |
logger.LogInformation(""+properties.LastEnqueuedTime); | |
EventPosition startingPosition = EventPosition.FromSequenceNumber(properties.LastEnqueuedSequenceNumber); | |
logger.LogInformation($"startingPosition: {startingPosition}"); | |
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsFromPartitionAsync(firstPartition, startingPosition)) | |
{ | |
string text = Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()); | |
logger.LogInformation($"event received: seq: {partitionEvent.Data.SequenceNumber} message: {text}"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment