Last active
March 17, 2026 11:40
-
-
Save agrazh/418a9ff90261a2ba7e7e8632bd7efe0c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import com.azure.identity.ClientSecretCredentialBuilder | |
| import com.azure.messaging.eventhubs.EventProcessorClientBuilder | |
| import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore | |
| import com.azure.messaging.eventhubs.models.{EventContext, ErrorContext} | |
| import com.azure.storage.blob.BlobContainerClientBuilder | |
| object EventHubProcessor extends App { | |
| val tenantId = "your-tenant-id" | |
| val clientId = "your-client-id" | |
| val clientSecret = "your-client-secret" | |
| val namespace = "your-namespace.servicebus.windows.net" | |
| val eventHubName = "your-eventhub-name" | |
| val storageConnString = "your-storage-connection-string" | |
| val containerName = "your-blob-container-name" | |
| // ── Service Principal credential ─────────────────────────────────────────── | |
| val credential = new ClientSecretCredentialBuilder() | |
| .tenantId(tenantId) | |
| .clientId(clientId) | |
| .clientSecret(clientSecret) | |
| .build() | |
| // ── Blob storage for checkpointing ───────────────────────────────────────── | |
| val blobContainerClient = new BlobContainerClientBuilder() | |
| .connectionString(storageConnString) | |
| .containerName(containerName) | |
| .buildClient() | |
| // ── Build processor ──────────────────────────────────────────────────────── | |
| val processorClient = new EventProcessorClientBuilder() | |
| .fullyQualifiedNamespace(namespace) | |
| .eventHubName(eventHubName) | |
| .consumerGroup("$Default") | |
| .credential(credential) | |
| .checkpointStore(new BlobCheckpointStore(blobContainerClient)) | |
| .processEvent((eventContext: EventContext) => { | |
| // ── Print the message ────────────────────────────────────────────────── | |
| val body = new String(eventContext.getEventData.getBody) | |
| val seqNumber = eventContext.getEventData.getSequenceNumber | |
| val partitionId = eventContext.getPartitionContext.getPartitionId | |
| val enqueuedAt = eventContext.getEventData.getEnqueuedTime | |
| println(s"Partition : $partitionId") | |
| println(s"Sequence : $seqNumber") | |
| println(s"Enqueued : $enqueuedAt") | |
| println(s"Body : $body") | |
| println("---") | |
| // save checkpoint after printing | |
| eventContext.updateCheckpoint() | |
| }) | |
| .processError((errorContext: ErrorContext) => { | |
| println(s"Error on partition ${errorContext.getPartitionContext.getPartitionId}") | |
| println(s"Error message: ${errorContext.getThrowable.getMessage}") | |
| }) | |
| .buildEventProcessorClient() | |
| processorClient.start() | |
| println("Processor started, listening for messages...") | |
| Thread.sleep(30000) // listen for 30 seconds | |
| processorClient.stop() | |
| println("Processor stopped.") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment