Skip to content

Instantly share code, notes, and snippets.

@agrazh
Last active March 17, 2026 11:40
Show Gist options
  • Select an option

  • Save agrazh/418a9ff90261a2ba7e7e8632bd7efe0c to your computer and use it in GitHub Desktop.

Select an option

Save agrazh/418a9ff90261a2ba7e7e8632bd7efe0c to your computer and use it in GitHub Desktop.
import com.azure.identity.ClientSecretCredentialBuilder
import com.azure.messaging.eventhubs.EventProcessorClientBuilder
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore
import com.azure.messaging.eventhubs.models.{EventContext, ErrorContext}
import com.azure.storage.blob.BlobContainerClientBuilder
object EventHubProcessor extends App {
val tenantId = "your-tenant-id"
val clientId = "your-client-id"
val clientSecret = "your-client-secret"
val namespace = "your-namespace.servicebus.windows.net"
val eventHubName = "your-eventhub-name"
val storageConnString = "your-storage-connection-string"
val containerName = "your-blob-container-name"
// ── Service Principal credential ───────────────────────────────────────────
val credential = new ClientSecretCredentialBuilder()
.tenantId(tenantId)
.clientId(clientId)
.clientSecret(clientSecret)
.build()
// ── Blob storage for checkpointing ─────────────────────────────────────────
val blobContainerClient = new BlobContainerClientBuilder()
.connectionString(storageConnString)
.containerName(containerName)
.buildClient()
// ── Build processor ────────────────────────────────────────────────────────
val processorClient = new EventProcessorClientBuilder()
.fullyQualifiedNamespace(namespace)
.eventHubName(eventHubName)
.consumerGroup("$Default")
.credential(credential)
.checkpointStore(new BlobCheckpointStore(blobContainerClient))
.processEvent((eventContext: EventContext) => {
// ── Print the message ──────────────────────────────────────────────────
val body = new String(eventContext.getEventData.getBody)
val seqNumber = eventContext.getEventData.getSequenceNumber
val partitionId = eventContext.getPartitionContext.getPartitionId
val enqueuedAt = eventContext.getEventData.getEnqueuedTime
println(s"Partition : $partitionId")
println(s"Sequence : $seqNumber")
println(s"Enqueued : $enqueuedAt")
println(s"Body : $body")
println("---")
// save checkpoint after printing
eventContext.updateCheckpoint()
})
.processError((errorContext: ErrorContext) => {
println(s"Error on partition ${errorContext.getPartitionContext.getPartitionId}")
println(s"Error message: ${errorContext.getThrowable.getMessage}")
})
.buildEventProcessorClient()
processorClient.start()
println("Processor started, listening for messages...")
Thread.sleep(30000) // listen for 30 seconds
processorClient.stop()
println("Processor stopped.")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment