Skip to content

Instantly share code, notes, and snippets.

Created November 11, 2019 17:21
Show Gist options
  • Save joperezr/de0c963213c69b870f2e65b92c5406a4 to your computer and use it in GitHub Desktop.
Save joperezr/de0c963213c69b870f2e65b92c5406a4 to your computer and use it in GitHub Desktop.
Structured Streaming from Azure EventHub using .NET for Apache Spark
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace Sample
public class EventHubPosition
public string Offset { get; set; }
public long SeqNo { get; set; }
public string EnqueuedTime { get; set; }
public bool IsInclusive { get; set; }
public class PositionKey
public string EventHubName { get; set; }
public int PartitionId { get; set; }
public static class EventHubConnection
// When you call this method you will pass in here the number of partitions that your EventHub has.
public static Dictionary<string, string> GetEventHubConnectionSettings(int eventHubPartitionCount)
// Change this for the name of your EventHub
string eventHubName = "nameOfYourEventHub";
var eventHubStartingPosition = new EventHubPosition
// Here I am always defaulting to start from 0 so I always re-process everything on the consumer group, but you might want to start at a different offset.
Offset = "0",
SeqNo = -1
var startingPositions = new Dictionary<string, EventHubPosition>();
for (int i = 0; i< eventHubPartitionCount; i++)
startingPositions.Add(JsonSerializer.Serialize<PositionKey>(new PositionKey { EventHubName = eventHubName, PartitionId = i }),
JsonSerializerOptions options = new JsonSerializerOptions
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
return new Dictionary<string, string>
// Here should pass in your eventhub connection string. I'm reading it from a env variable, but you can do it differently if you want to.
{"eventhubs.connectionString", Environment.GetEnvironmentVariable("YOUR_EVENTHUB_CONNECTION_STRING") },
// Change this for the name of your consumer Group.
{"eventhubs.consumerGroup", "$Default" },
{"eventhubs.startingPositions", JsonSerializer.Serialize<Dictionary<string, EventHubPosition>>(startingPositions, options) }
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;
using System.Diagnostics;
using System.IO;
using System.Text;
namespace Sample
class Program
static void Main(string[] args)
SparkSession spark = SparkSession
.AppName("VS Telemetry processing using Spark.NET")
DataFrame events = spark
FromJson(Col("body").Cast("string"), "BlobPath String").Alias("Raw"),
.SelectExpr("Raw.*", "properties", "enqueuedTime")
.WithWatermark("enqueuedTime", "60 seconds")
StreamingQuery query = spark
.Sql("SELECT * FROM Messages")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment