Skip to content

Instantly share code, notes, and snippets.

@jrichardsz
Last active August 13, 2024 20:45
Show Gist options
  • Save jrichardsz/a7d25a0492dff895b5a22ce578d9c33e to your computer and use it in GitHub Desktop.
Save jrichardsz/a7d25a0492dff895b5a22ce578d9c33e to your computer and use it in GitHub Desktop.
azure event hub snippets
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using Microsoft.Extensions.Logging;

using var loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .AddFilter("Microsoft", LogLevel.Warning)
        .AddFilter("System", LogLevel.Warning)
        .AddFilter("NonHostConsoleApp.Program", LogLevel.Debug)
        .AddSimpleConsole(options =>
        {
            options.IncludeScopes = true;
            options.SingleLine = true;
            options.TimestampFormat = "HH:mm:ss ";
        });
});
ILogger logger = loggerFactory.CreateLogger<Program>();

logger.LogInformation("Starting");

static bool ValidateServerCertificate(
                   object sender,
                   X509Certificate certificate,
                   X509Chain chain,
                   SslPolicyErrors sslPolicyErrors)
{
   Console.WriteLine(certificate.ToString());
   if ((sslPolicyErrors == SslPolicyErrors.None)
       || (certificate.GetSerialNumberString() == "*********"))
   {
       Console.WriteLine("cert is an allowed self signed certificate");
       return true;
   }
   // Do not allow communication with unauthorized servers.
   return false;
}

var producerOptions = new EventHubProducerClientOptions();
producerOptions.ConnectionOptions.CertificateValidationCallback = ValidateServerCertificate;

try
{
    //UseDevelopmentEmulator=true;
    EventHubProducerClient producerClient = new EventHubProducerClient(
    "Endpoint=sb://localhost;SharedAccessKeyName=foo;SharedAccessKey=bar;UseDevelopmentEmulator=true;", "foo-event-hub");

    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

    if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event Hello"))))
    {
        // if it is too large for the batch
        throw new Exception($"Event is too large for the batch and cannot be sent.");
    }
    await producerClient.SendAsync(eventBatch);
    logger.LogInformation("event published");

}
catch (Exception ex)
{
    throw new Exception("Failed to send event", ex);
}

nginx

user  nginx;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {
    worker_connections  1024;
}

stream {

  upstream api_servers {
      server 10.250.193.3:5672;
  }

  server {
      listen      5671 ssl;

      ssl_protocols           TLSv1.2 TLSv1.1 TLSv1;
      ssl_ciphers             RC4:HIGH:!aNULL:!MD5;
      ssl_handshake_timeout   30s;

      ssl_certificate /etc/nginx/cert/nginx-selfsigned.crt;
      ssl_certificate_key /etc/nginx/cert/nginx-selfsigned.key;

      proxy_connect_timeout 1s;
      proxy_pass api_servers;
  }
}

docker-compose

  nginx:
    image: nginx:1.19.4
    depends_on:
      - azure-event-hub-mock
    volumes:
      - ./nginx/config/nginx.conf:/etc/nginx/nginx.conf
      - ./nginx/cert:/etc/nginx/cert
    ports:
      - '5671:5671'
    networks:
      - docker-mocks-starter
    extra_hosts:
      - "host.docker.internal:host-gateway"

  azure-event-hub-mock:
    container_name: azure-event-hub-mock
    image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:1.2.4-preview-amd64"
    volumes:
      - "./.eventhub-emulator/config.json:/Eventhubs_Emulator/ConfigFiles/Config.json"
    ports:
      - "5672:5672"
    environment:
      BLOB_SERVER: azurite
      METADATA_SERVER: azurite
      ACCEPT_EULA: Y
    depends_on:
      - azurite
    networks:
      - docker-mocks-starter
    extra_hosts:
      - "host.docker.internal:host-gateway"
      
  azurite:
    image: mcr.microsoft.com/azure-storage/azurite
    container_name: azurite
    ports:
     - "10000:10000"    
     - "10001:10001"
     - "10002:10002"
    environment:
      TZ: America/Newyork               
    networks:
      docker-mocks-starter:
        aliases:
          - "azurite"
    extra_hosts:
      - "host.docker.internal:host-gateway"       

event hub emulator config

{
    "UserConfig": {
        "NamespaceConfig": [
            {
                "Type": "EventHub",
                "Name": "emulatorNs1",
                "Entities": [
                    {
                        "Name": "source",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "source-consumer-group"
                            }
                        ]
                    },
                    {
                        "Name": "target",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "source-consumer-group"
                            }
                        ]
                    },
                    {
                        "Name": "foo",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "bar"
                            }
                        ]
                    },
                    {
                        "Name": "minimal-publisher",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "minimal-subscriber"
                            }
                        ]
                    }
                ]
            }
        ],
        "LoggingConfig": {
            "Type": "File"
        }
    }
}

docker compose

  azurite:
    image: mcr.microsoft.com/azure-storage/azurite
    container_name: azurite
    ports:
     - "10000:10000"    
     - "10001:10001"
     - "10002:10002"
    environment:
      TZ: America/Newyork               
    networks:
      docker-mocks-starter:
        aliases:
          - "azurite"
    extra_hosts:
      - "host.docker.internal:host-gateway" 

  azure-event-hub-mock:
    container_name: azure-event-hub-mock
    image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:1.2.4-preview-amd64"
    volumes:
      - "./.eventhub-emulator/config.json:/Eventhubs_Emulator/ConfigFiles/Config.json"
    ports:
      - "5672:5672"
    environment:
      BLOB_SERVER: azurite
      METADATA_SERVER: azurite
      ACCEPT_EULA: Y
    depends_on:
      - azurite
    networks:
      - docker-mocks-starter
    extra_hosts:
      - "host.docker.internal:host-gateway"

config.json

{
    "UserConfig": {
        "NamespaceConfig": [
            {
                "Type": "EventHub",
                "Name": "emulatorNs1",
                "Entities": [
                    {
                        "Name": "source",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "source-consumer-group"
                            }
                        ]
                    },
                    {
                        "Name": "target",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "source-consumer-group"
                            }
                        ]
                    },
                    {
                        "Name": "foo-event-hub",
                        "PartitionCount": "2",
                        "ConsumerGroups": [
                            {
                                "Name": "bar-event-hub"
                            }
                        ]
                    }
                ]
            }
        ],
        "LoggingConfig": {
            "Type": "File"
        }
    }
}

Program.cs

using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using Microsoft.Extensions.Logging;

using var loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .AddFilter("Microsoft", LogLevel.Warning)
        .AddFilter("System", LogLevel.Warning)
        .AddFilter("NonHostConsoleApp.Program", LogLevel.Debug)
        .AddSimpleConsole(options =>
        {
            options.IncludeScopes = true;
            options.SingleLine = true;
            options.TimestampFormat = "HH:mm:ss ";
        });
});
ILogger logger = loggerFactory.CreateLogger<Program>();

logger.LogInformation("Starting");

static bool ValidateServerCertificate(
                   object sender,
                   X509Certificate certificate,
                   X509Chain chain,
                   SslPolicyErrors sslPolicyErrors)
{
   Console.WriteLine(certificate.ToString());
   if ((sslPolicyErrors == SslPolicyErrors.None)
       || (certificate.GetSerialNumberString() == "*********"))
   {
       Console.WriteLine("cert is an allowed self signed certificate");
       return true;
   }
   // Do not allow communication with unauthorized servers.
   return false;
}

var producerOptions = new EventHubProducerClientOptions();
producerOptions.ConnectionOptions.CertificateValidationCallback = ValidateServerCertificate;

try
{
    //UseDevelopmentEmulator=true;
    EventHubProducerClient producerClient = new EventHubProducerClient(
    "Endpoint=sb://localhost;SharedAccessKeyName=foo;SharedAccessKey=bar;UseDevelopmentEmulator=true;", "foo");

    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

    if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event Hello"))))
    {
        // if it is too large for the batch
        throw new Exception($"Event is too large for the batch and cannot be sent.");
    }
    await producerClient.SendAsync(eventBatch);
    logger.LogInformation("event published");

}
catch (Exception ex)
{
    throw new Exception("Failed to send event", ex);
}
EventHubConsumerClient consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
string firstPartition = (await consumerClient.GetPartitionIdsAsync()).First();
PartitionProperties properties = await consumerClient.GetPartitionPropertiesAsync(firstPartition);
logger.LogInformation(""+properties.LastEnqueuedTime);
EventPosition startingPosition = EventPosition.FromSequenceNumber(properties.LastEnqueuedSequenceNumber);
logger.LogInformation($"startingPosition: {startingPosition}");
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsFromPartitionAsync(firstPartition, startingPosition))
{
string text = Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray());
logger.LogInformation($"event received: seq: {partitionEvent.Data.SequenceNumber} message: {text}");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment