Created
November 11, 2019 17:21
-
-
Save joperezr/de0c963213c69b870f2e65b92c5406a4 to your computer and use it in GitHub Desktop.
Structured Streaming from Azure EventHub using .NET for Apache Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Text.Json; | |
using System.Text.Json.Serialization; | |
namespace Sample | |
{ | |
public class EventHubPosition | |
{ | |
public string Offset { get; set; } | |
public long SeqNo { get; set; } | |
public string EnqueuedTime { get; set; } | |
public bool IsInclusive { get; set; } | |
} | |
public class PositionKey | |
{ | |
[JsonPropertyName("ehName")] | |
public string EventHubName { get; set; } | |
[JsonPropertyName("partitionId")] | |
public int PartitionId { get; set; } | |
} | |
public static class EventHubConnection | |
{ | |
// When you call this method you will pass in here the number of partitions that your EventHub has. | |
public static Dictionary<string, string> GetEventHubConnectionSettings(int eventHubPartitionCount) | |
{ | |
// Change this for the name of your EventHub | |
string eventHubName = "nameOfYourEventHub"; | |
var eventHubStartingPosition = new EventHubPosition | |
{ | |
// Here I am always defaulting to start from 0 so I always re-process everything on the consumer group, but you might want to start at a different offset. | |
Offset = "0", | |
SeqNo = -1 | |
}; | |
var startingPositions = new Dictionary<string, EventHubPosition>(); | |
for (int i = 0; i< eventHubPartitionCount; i++) | |
{ | |
startingPositions.Add(JsonSerializer.Serialize<PositionKey>(new PositionKey { EventHubName = eventHubName, PartitionId = i }), | |
eventHubStartingPosition); | |
} | |
JsonSerializerOptions options = new JsonSerializerOptions | |
{ | |
PropertyNamingPolicy = JsonNamingPolicy.CamelCase | |
}; | |
return new Dictionary<string, string> | |
{ | |
// Here should pass in your eventhub connection string. I'm reading it from a env variable, but you can do it differently if you want to. | |
{"eventhubs.connectionString", Environment.GetEnvironmentVariable("YOUR_EVENTHUB_CONNECTION_STRING") }, | |
// Change this for the name of your consumer Group. | |
{"eventhubs.consumerGroup", "$Default" }, | |
{"eventhubs.startingPositions", JsonSerializer.Serialize<Dictionary<string, EventHubPosition>>(startingPositions, options) } | |
}; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Spark.Sql; | |
using Microsoft.Spark.Sql.Streaming; | |
using Microsoft.Spark.Sql.Types; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Text; | |
namespace Sample | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
SparkSession spark = SparkSession | |
.Builder() | |
.AppName("VS Telemetry processing using Spark.NET") | |
.GetOrCreate(); | |
DataFrame events = spark | |
.ReadStream() | |
.Format("eventhubs") | |
.Options(EventHubConnection.GetEventHubConnectionSettings(5)) | |
.Load(); | |
events | |
.Select( | |
FromJson(Col("body").Cast("string"), "BlobPath String").Alias("Raw"), | |
Col("properties"), | |
Col("enqueuedTime") | |
) | |
.SelectExpr("Raw.*", "properties", "enqueuedTime") | |
.WithWatermark("enqueuedTime", "60 seconds") | |
.CreateOrReplaceTempView("Messages"); | |
StreamingQuery query = spark | |
.Sql("SELECT * FROM Messages") | |
.WriteStream() | |
.OutputMode("append") | |
.Format("console") | |
.Start(); | |
query.AwaitTermination(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment