Last active
June 22, 2025 04:52
-
-
Save badsyntax/96ab6c76a35822b5ade5f46ae86494f6 to your computer and use it in GitHub Desktop.
Download and process massive CSV files from Blob storage using IAsyncEnumerator in Elsa 3.4.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Azure.Storage.Blobs; | |
using Elsa.Extensions; | |
using Elsa.Workflows; | |
using Elsa.Workflows.Activities; | |
using Elsa.Workflows.Memory; | |
using Elsa.Workflows.Models; | |
using Sylvan.Data.Csv; | |
namespace ElsaConsole | |
{ | |
public class ElsaExample | |
{ | |
public static async Task Run_Example1( | |
IWorkflowBuilderFactory workflowBuilderFactory, | |
IWorkflowRunner workflowRunner) | |
{ | |
var workflowBuilder = workflowBuilderFactory.CreateBuilder(); | |
var fileStreamOutput = workflowBuilder.WithVariable<Stream>(); | |
var enumerableOutput = workflowBuilder.WithVariable<IAsyncEnumerator<string>>(); | |
var blobActivity = new BlobActivity(fileStreamOutput); | |
var csvActivity = new CSVActivity(fileStreamOutput, enumerableOutput); | |
var whileActivity = new While(async (context) => | |
{ | |
var enumerator = context.Get<IAsyncEnumerator<string>>(enumerableOutput)!; | |
return await enumerator.MoveNextAsync(); | |
}) | |
{ | |
Body = new Inline((context) => | |
{ | |
var enumerator = context.Get<IAsyncEnumerator<string>>(enumerableOutput)!; | |
var currentValue = enumerator.Current; | |
Console.WriteLine(currentValue); | |
}) | |
}; | |
workflowBuilder.Root = new Sequence | |
{ | |
Activities = [ | |
blobActivity, | |
csvActivity, | |
whileActivity | |
] | |
}; | |
var workflow = await workflowBuilder.BuildWorkflowAsync(); | |
var output = await workflowRunner.RunAsync(workflow); | |
} | |
} | |
public class BlobActivity : CodeActivity<Stream> | |
{ | |
public BlobActivity(Variable<Stream> result) | |
{ | |
Result = new(result); | |
} | |
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) | |
{ | |
context.SetResult(await GetStream()); | |
} | |
private static async Task<Stream> GetStream() | |
{ | |
var storageAccountName = "storageaccount"; | |
var containerName = "container"; | |
var blobName = "example.csv"; | |
var sasToken = "TOKEN"; | |
var blobStorageUri = new UriBuilder($"https://{storageAccountName}.blob.core.windows.net") | |
{ | |
Query = sasToken | |
}; | |
var blobServiceClient = new BlobServiceClient(blobStorageUri.Uri); | |
var containerClient = blobServiceClient.GetBlobContainerClient(containerName); | |
var blobClient = containerClient.GetBlobClient(blobName); | |
return await blobClient.OpenReadAsync(); | |
} | |
} | |
public class CSVActivity : CodeActivity<IAsyncEnumerator<string>> | |
{ | |
public Input<Stream> Input { get; set; } | |
public CSVActivity(Variable<Stream> input, Variable<IAsyncEnumerator<string>> result) | |
{ | |
Input = new(input); | |
Result = new(result); | |
} | |
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) | |
{ | |
context.SetResult(GetEnumerator(context)); | |
await Task.CompletedTask; | |
} | |
private async IAsyncEnumerator<string> GetEnumerator(ActivityExecutionContext context) | |
{ | |
var stream = context.Get<Stream>(Input)!; | |
var csv = CsvDataReader.Create(new StreamReader(stream)); | |
while (await csv.ReadAsync()) | |
{ | |
var name = csv.GetString(1); | |
yield return name; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment