Last active
April 17, 2024 17:51
-
-
Save jonuwz/ce5662093852224d3a22fbefb9351df9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -------------------------------------------------------------------------------------------- | |
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. See License.txt in the project root for license information. | |
# -------------------------------------------------------------------------------------------- | |
""" | |
An example to show receiving events from an Event Hub. | |
Create a .env file with the content : | |
EVENTHUB_FQDN="XXXXXXXX.servicebus.windows.net" | |
EVENTHUB_NAME="your_event_hub_name" | |
CONSUMER_GROUP='$Default' # make sure you change this if it'll affect d$aother consumers | |
AZURE_TENANT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
AZURE_CLIENT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
AZURE_CLIENT_SECRET="the_secret" | |
""" | |
import os | |
from azure.eventhub import EventHubConsumerClient | |
from azure.identity import EnvironmentCredential | |
from dotenv import load_dotenv | |
load_dotenv(".env") | |
def on_event(partition_context, event): | |
# Put your code here. | |
# If the operation is i/o intensive, multi-thread will have better performance. | |
print("Received event from partition: {}.".format(partition_context.partition_id)) | |
print(event) | |
def on_partition_initialize(partition_context): | |
# Put your code here. | |
print("Partition: {} has been initialized.".format(partition_context.partition_id)) | |
def on_partition_close(partition_context, reason): | |
# Put your code here. | |
print("Partition: {} has been closed, reason for closing: {}.".format( | |
partition_context.partition_id, | |
reason | |
)) | |
def on_error(partition_context, error): | |
# Put your code here. partition_context can be None in the on_error callback. | |
if partition_context: | |
print("An exception: {} occurred during receiving from Partition: {}.".format( | |
partition_context.partition_id, | |
error | |
)) | |
else: | |
print("An exception: {} occurred during the load balance process.".format(error)) | |
if __name__ == '__main__': | |
credential = EnvironmentCredential() | |
consumer_client = EventHubConsumerClient( | |
fully_qualified_namespace=os.environ["EVENTHUB_FQDN"], | |
consumer_group=os.environ["CONSUMER_GROUP"], | |
eventhub_name=os.environ["EVENTHUB_NAME"], | |
credential=credential, | |
) | |
try: | |
with consumer_client: | |
consumer_client.receive( | |
on_event=on_event, | |
on_partition_initialize=on_partition_initialize, | |
on_partition_close=on_partition_close, | |
on_error=on_error, | |
starting_position="@latest", | |
) | |
except KeyboardInterrupt: | |
print('Stopped receiving.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment