Skip to content

Instantly share code, notes, and snippets.

@joperezr
Created November 11, 2019 17:21
Show Gist options
  • Save joperezr/de0c963213c69b870f2e65b92c5406a4 to your computer and use it in GitHub Desktop.
Save joperezr/de0c963213c69b870f2e65b92c5406a4 to your computer and use it in GitHub Desktop.
Structured Streaming from Azure EventHub using .NET for Apache Spark
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace Sample
{
public class EventHubPosition
{
public string Offset { get; set; }
public long SeqNo { get; set; }
public string EnqueuedTime { get; set; }
public bool IsInclusive { get; set; }
}
public class PositionKey
{
[JsonPropertyName("ehName")]
public string EventHubName { get; set; }
[JsonPropertyName("partitionId")]
public int PartitionId { get; set; }
}
public static class EventHubConnection
{
// When you call this method you will pass in here the number of partitions that your EventHub has.
public static Dictionary<string, string> GetEventHubConnectionSettings(int eventHubPartitionCount)
{
// Change this for the name of your EventHub
string eventHubName = "nameOfYourEventHub";
var eventHubStartingPosition = new EventHubPosition
{
// Here I am always defaulting to start from 0 so I always re-process everything on the consumer group, but you might want to start at a different offset.
Offset = "0",
SeqNo = -1
};
var startingPositions = new Dictionary<string, EventHubPosition>();
for (int i = 0; i< eventHubPartitionCount; i++)
{
startingPositions.Add(JsonSerializer.Serialize<PositionKey>(new PositionKey { EventHubName = eventHubName, PartitionId = i }),
eventHubStartingPosition);
}
JsonSerializerOptions options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
return new Dictionary<string, string>
{
// Here should pass in your eventhub connection string. I'm reading it from a env variable, but you can do it differently if you want to.
{"eventhubs.connectionString", Environment.GetEnvironmentVariable("YOUR_EVENTHUB_CONNECTION_STRING") },
// Change this for the name of your consumer Group.
{"eventhubs.consumerGroup", "$Default" },
{"eventhubs.startingPositions", JsonSerializer.Serialize<Dictionary<string, EventHubPosition>>(startingPositions, options) }
};
}
}
}
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;
using System.Diagnostics;
using System.IO;
using System.Text;
namespace Sample
{
class Program
{
static void Main(string[] args)
{
SparkSession spark = SparkSession
.Builder()
.AppName("VS Telemetry processing using Spark.NET")
.GetOrCreate();
DataFrame events = spark
.ReadStream()
.Format("eventhubs")
.Options(EventHubConnection.GetEventHubConnectionSettings(5))
.Load();
events
.Select(
FromJson(Col("body").Cast("string"), "BlobPath String").Alias("Raw"),
Col("properties"),
Col("enqueuedTime")
)
.SelectExpr("Raw.*", "properties", "enqueuedTime")
.WithWatermark("enqueuedTime", "60 seconds")
.CreateOrReplaceTempView("Messages");
StreamingQuery query = spark
.Sql("SELECT * FROM Messages")
.WriteStream()
.OutputMode("append")
.Format("console")
.Start();
query.AwaitTermination();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment