Skip to content

Instantly share code, notes, and snippets.

@badsyntax
Last active June 22, 2025 04:52
Show Gist options
  • Save badsyntax/96ab6c76a35822b5ade5f46ae86494f6 to your computer and use it in GitHub Desktop.
Save badsyntax/96ab6c76a35822b5ade5f46ae86494f6 to your computer and use it in GitHub Desktop.
Download and process massive CSV files from Blob storage using IAsyncEnumerator in Elsa 3.4.0
using Azure.Storage.Blobs;
using Elsa.Extensions;
using Elsa.Workflows;
using Elsa.Workflows.Activities;
using Elsa.Workflows.Memory;
using Elsa.Workflows.Models;
using Sylvan.Data.Csv;
namespace ElsaConsole
{
public class ElsaExample
{
public static async Task Run_Example1(
IWorkflowBuilderFactory workflowBuilderFactory,
IWorkflowRunner workflowRunner)
{
var workflowBuilder = workflowBuilderFactory.CreateBuilder();
var fileStreamOutput = workflowBuilder.WithVariable<Stream>();
var enumerableOutput = workflowBuilder.WithVariable<IAsyncEnumerator<string>>();
var blobActivity = new BlobActivity(fileStreamOutput);
var csvActivity = new CSVActivity(fileStreamOutput, enumerableOutput);
var whileActivity = new While(async (context) =>
{
var enumerator = context.Get<IAsyncEnumerator<string>>(enumerableOutput)!;
return await enumerator.MoveNextAsync();
})
{
Body = new Inline((context) =>
{
var enumerator = context.Get<IAsyncEnumerator<string>>(enumerableOutput)!;
var currentValue = enumerator.Current;
Console.WriteLine(currentValue);
})
};
workflowBuilder.Root = new Sequence
{
Activities = [
blobActivity,
csvActivity,
whileActivity
]
};
var workflow = await workflowBuilder.BuildWorkflowAsync();
var output = await workflowRunner.RunAsync(workflow);
}
}
public class BlobActivity : CodeActivity<Stream>
{
public BlobActivity(Variable<Stream> result)
{
Result = new(result);
}
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
context.SetResult(await GetStream());
}
private static async Task<Stream> GetStream()
{
var storageAccountName = "storageaccount";
var containerName = "container";
var blobName = "example.csv";
var sasToken = "TOKEN";
var blobStorageUri = new UriBuilder($"https://{storageAccountName}.blob.core.windows.net")
{
Query = sasToken
};
var blobServiceClient = new BlobServiceClient(blobStorageUri.Uri);
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
var blobClient = containerClient.GetBlobClient(blobName);
return await blobClient.OpenReadAsync();
}
}
public class CSVActivity : CodeActivity<IAsyncEnumerator<string>>
{
public Input<Stream> Input { get; set; }
public CSVActivity(Variable<Stream> input, Variable<IAsyncEnumerator<string>> result)
{
Input = new(input);
Result = new(result);
}
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
context.SetResult(GetEnumerator(context));
await Task.CompletedTask;
}
private async IAsyncEnumerator<string> GetEnumerator(ActivityExecutionContext context)
{
var stream = context.Get<Stream>(Input)!;
var csv = CsvDataReader.Create(new StreamReader(stream));
while (await csv.ReadAsync())
{
var name = csv.GetString(1);
yield return name;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment