Last active
January 29, 2023 19:23
-
-
Save chrisfcarroll/ab3b8b28252a3811f71a3f2c0374711f to your computer and use it in GitHub Desktop.
Azure Blob Storage parallel downloads of 1000s files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Dependency: dotnet add package Azure.Storage.Blobs | |
// #r "nuget:Azure.Storage.Blobs" | |
// | |
// Parallel download blobs from an Azure Blob Storage container, and report speed and bandwidth metrics | |
// As is, this code searches for blobs by index tag. To download by virtual folder instead, | |
// replace blobItems = containerClient.FindBlobsByTags(indexTagFilter) with | |
// blobItemsUntagged = containerClient.GetBlobs(); | |
// | |
using System.Diagnostics; | |
using System.Dynamic; | |
using System.Text.Json; | |
using Azure.Storage.Blobs; | |
using Azure.Storage.Blobs.Models; | |
if (args.Length < 2 || !Uri.TryCreate(args[0],UriKind.Absolute,out var speedTestUrl)) | |
{ | |
Console.WriteLine("\nParallel download blobs from an Azure Blob Storage container, " + | |
"\nand report speed and bandwidth metrics." + | |
"\n\nUsage from project directory: " + | |
"\ndotnet run -- <absoluteStorageUrlWithSaS> <containerName>" + | |
" [<timeboxSeconds=5>]" + | |
" [<indexTagFilter=\\\"state\\\"='new']" + | |
"\n"); | |
Environment.Exit(args.Length==0?0:1); | |
return; | |
} | |
var containerName = args[1]; | |
var timeoutSecounds = args.Length >= 3 && int.TryParse(args[2], out var arg2) && arg2 > 0 ? arg2 : 5; | |
var indexTagFilter = args.Length >= 4 ? args[3] : "\"state\"='new'"; | |
Console.WriteLine($"Container={containerName}, Timeboxed to {timeoutSecounds} seconds, Filtered on Index Tags {indexTagFilter}"); | |
#pragma warning disable CS0162 //unreachable code | |
if (true) | |
{ | |
var parallelism = Environment.ProcessorCount * 8; | |
Console.WriteLine($"{Environment.ProcessorCount} processors found, use parallelism={parallelism}"); | |
System.Net.ServicePointManager.DefaultConnectionLimit += parallelism; | |
ThreadPool.SetMinThreads(parallelism, Environment.ProcessorCount); | |
} | |
else | |
{ | |
ThreadPool.GetMinThreads(out var workerThreads, out _); | |
Console.WriteLine("Using DotNetCore defaults unchanged: " + | |
$"System.Net.ServicePointManager.DefaultConnectionLimit={System.Net.ServicePointManager.DefaultConnectionLimit}\n" + | |
$"ThreadPool.MinWorkerThreads={workerThreads}"); | |
} | |
var throttleParallelDownloadsAt = 1000; | |
var semaphore = new SemaphoreSlim(throttleParallelDownloadsAt); | |
var blobServiceClient = new BlobServiceClient(speedTestUrl); | |
var containerClient = blobServiceClient.GetBlobContainerClient(containerName); | |
var blobItems = containerClient.FindBlobsByTags(indexTagFilter); | |
//var blobItems = containerClient.GetBlobs(); | |
var blobCount = blobItems.Count(); | |
Console.WriteLine($"{blobCount} blobs found"); | |
var files = new Queue<string>(); | |
var tasks = new List<Task>(); | |
var timedout = 0; | |
var stopwatch = Stopwatch.StartNew(); | |
var timeoutCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSecounds)).Token; | |
foreach (var blob in blobItems) | |
{ | |
await semaphore.WaitAsync(); | |
tasks.Add(Task.Run(async () => | |
{ | |
try | |
{ | |
var blobClient = containerClient.GetBlobClient(blob.BlobName /*blob.Name*/); | |
// Can do the stream reading manually, but it ain't any better | |
// using var stream = new MemoryStream(); | |
// await blobClient.DownloadToAsync(stream, timeoutCancellation); | |
// stream.Position = 0; | |
// using var streamReader = new StreamReader(stream); | |
// var result = await streamReader.ReadToEndAsync(); | |
var result = (await blobClient.DownloadContentAsync(timeoutCancellation)) | |
.Value.Content; | |
if (result is not null) | |
{ | |
files.Enqueue(result.ToString()); | |
} | |
} | |
catch (TaskCanceledException) { timedout++; } | |
finally | |
{ | |
semaphore.Release(); | |
} | |
},timeoutCancellation)); | |
} | |
int filesSeen = 0; | |
int totalSize = 0; | |
double time99; | |
var _99thPercentile = blobCount * 99 / 100; | |
while (filesSeen < blobCount && ! timeoutCancellation.IsCancellationRequested) | |
{ | |
try | |
{ | |
if (files.TryDequeue(out var file)) | |
{ | |
//var obj = JsonSerializer.Deserialize<ExpandoObject>(file)??new ExpandoObject(); | |
//var isJsonDeserializable = obj.Count()>0; | |
Console.Write( | |
$"{++filesSeen}.Length={1.0/1024 * file.Length:F1}KB," + | |
// $"{(isJsonDeserializable?(obj.Count() + " json properties"):"")}," + | |
$"{stopwatch.Elapsed.TotalSeconds:F1}secs, "); | |
totalSize += file.Length; | |
if (filesSeen == _99thPercentile) | |
{ | |
time99 = stopwatch.Elapsed.TotalSeconds; | |
Console.WriteLine($"{filesSeen} in {time99 :F2}secs. {filesSeen/time99 :F0} Events per second," + | |
$" {totalSize/1024/1024/time99 :F2} MBytes per second, " + | |
$"average filesize {(totalSize/filesSeen/1024) :F0}KB"); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine($"{e.Message} at file {filesSeen} {e.StackTrace?.Replace("\n"," | ")}"); | |
} | |
Thread.Yield(); | |
} | |
var time = stopwatch.Elapsed.TotalSeconds; | |
Console.WriteLine("\n"); | |
if(timeoutCancellation.IsCancellationRequested){Console.WriteLine($"Timed out at {time:F2} seconds");} | |
if (filesSeen == 0) | |
{ | |
Console.WriteLine("Got nothing."); | |
} | |
else | |
{ | |
Console.WriteLine($"{filesSeen} in {time :F2}secs. {filesSeen/time :F0} files per second." + | |
$" {totalSize/1024/1024/time :F2} MBytes per second, " + | |
$"average filesize {(totalSize/filesSeen/1024) :F0}KB"); | |
} | |
if (0 < filesSeen && filesSeen < blobCount) | |
{ | |
Console.WriteLine( | |
$"Got {100 * filesSeen/blobCount}% of {blobCount} before timeboxing at {timeoutSecounds} seconds." + | |
(timedout>0 ? $" {timedout} downloads abandoned." :"")); | |
} | |
await Task.WhenAll(tasks); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Parallel download blobs from an Azure Blob Storage container, and report speed and bandwidth metrics | |
// As is, this code searches for blobs by index tag. To download by virtual folder instead, | |
// replace blobItems = containerClient.FindBlobsByTags(indexTagFilter) with | |
// blobItemsUntagged = containerClient.GetBlobs(); | |
// | |
using System.Collections.Concurrent; | |
using System.Diagnostics; | |
using System.Dynamic; | |
using System.Globalization; | |
using System.Text.Json; | |
using System.Threading.Tasks.Sources; | |
using Azure.Storage.Blobs; | |
var timeLog = new RecordingStopwatch().SetVerbose(Console.Out).ClearAndStart().AddThenIndent("Start"); | |
//--Parse Args--------------------------------------------------------------------------------------- | |
if (args.Length < 2 || !Uri.TryCreate(args[0],UriKind.Absolute,out var speedTestUrl)) | |
{ | |
Console.WriteLine("\nParallel download blobs from an Azure Blob Storage container, " + | |
"\nand report speed and bandwidth metrics." + | |
"\n\nUsage from project directory: " + | |
"\ndotnet run -- <absoluteStorageUrlWithSaS> <containerName>" + | |
" [<timeboxSeconds=5>]" + | |
" [<indexTagFilter=\\\"state\\\"='new']" + | |
"\n"); | |
Environment.Exit(args.Length==0?0:1); | |
return; | |
} | |
var containerName = args[1]; | |
var timeoutSecounds = args.Length >= 3 && int.TryParse(args[2], out var arg2) && arg2 > 0 ? arg2 : 5; | |
var indexTagFilter = args.Length >= 4 ? args[3] : "\"state\"='new'"; | |
Console.WriteLine($"Container={containerName}, Timeboxed to {timeoutSecounds} seconds, Filtered on Index Tags {indexTagFilter}"); | |
timeLog.Add("ParsedArgs"); | |
//--------------------------------------------------------------------------------------------------- | |
//--Maybe set Connection and Thread limits ---------------------------------------------------------------- | |
var parallelism = Environment.ProcessorCount * 8; | |
#pragma warning disable CS0162 //unreachable code | |
if (true) | |
{ | |
Console.WriteLine($"{Environment.ProcessorCount} processors found, use parallelism={parallelism}"); | |
System.Net.ServicePointManager.DefaultConnectionLimit += parallelism; | |
ThreadPool.SetMinThreads(parallelism, Environment.ProcessorCount); | |
} | |
else | |
{ | |
ThreadPool.GetMinThreads(out var workerThreads, out _); | |
Console.WriteLine("Using DotNetCore defaults unchanged: " + | |
$"System.Net.ServicePointManager.DefaultConnectionLimit={System.Net.ServicePointManager.DefaultConnectionLimit}\n" + | |
$"ThreadPool.MinWorkerThreads={workerThreads}"); | |
} | |
//--------------------------------------------------------------------------------------------------- | |
//--Init workspace----------------------------------------------------------------------------------- | |
var files = new ConcurrentQueue<string>(); | |
var downloadTaskSemaphore = new SemaphoreSlim(parallelism); //I think we get more Azure errors if we don't use the throttle | |
var downloadTasks = new List<Task>(); | |
var blobCount = 0; | |
var timedout = 0; | |
//--------------------------------------------------------------------------------------------------- | |
//--FindBlobs---------------------------------------------------------------------------------------- | |
var blobServiceClient = new BlobServiceClient(speedTestUrl); | |
var containerClient = blobServiceClient.GetBlobContainerClient(containerName); | |
timeLog.Add("FindBlobsByTag:start"); | |
var blobItems = containerClient.FindBlobsByTags(indexTagFilter); | |
//var blobItems = containerClient.GetBlobs(); | |
timeLog.Add("FindBlobsByTag:end"); | |
//--------------------------------------------------------------------------------------------------- | |
//--Download Files in a loop------------------------------------------------------------------------- | |
var timeoutCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSecounds)).Token; | |
timeLog.Add("DownloadLoop:start"); | |
foreach (var blob in blobItems) | |
{ | |
await downloadTaskSemaphore.WaitAsync(); | |
blobCount++; | |
downloadTasks.Add( | |
Task.Run(async () => | |
{ | |
try | |
{ | |
var blobClient = containerClient.GetBlobClient(blob.BlobName /*blob.Name*/); | |
// Can do the stream reading manually, but it ain't any better | |
// using var stream = new MemoryStream(); | |
// await blobClient.DownloadToAsync(stream, timeoutCancellation); | |
// stream.Position = 0; | |
// using var streamReader = new StreamReader(stream); | |
// var result = await streamReader.ReadToEndAsync(); | |
var result = (await blobClient.DownloadContentAsync(timeoutCancellation)) | |
.Value.Content; | |
if (result is not null) | |
{ | |
files.Enqueue(result.ToString()); | |
} | |
} | |
catch (TaskCanceledException) { timedout++; } | |
finally | |
{ | |
downloadTaskSemaphore.Release(); | |
} | |
},timeoutCancellation)); | |
} | |
timeLog.Add("DownloadLoop:startedall"); | |
Console.WriteLine("Total Blobs Found="+blobCount); | |
//--------------------------------------------------------------------------------------------------- | |
//--Deserialize or at least Count downloaded files--------------------------------------------------- | |
timeLog.Add("DequeueLoop:start"); | |
int filesSeen = 0; | |
int totalSize = 0; | |
double time99; | |
var _99thPercentile = blobCount * 99 / 100; | |
while (filesSeen < blobCount && !timeoutCancellation.IsCancellationRequested) | |
{ | |
try | |
{ | |
if (files.TryDequeue(out var content)) | |
{ | |
//var obj = JsonSerializer.Deserialize<ExpandoObject>(file)??new ExpandoObject(); | |
//var isJsonDeserializable = obj.Count()>0; | |
if (content is null) | |
{ | |
Console.WriteLine("Assertion Failed: Dequeued null but shouldn't have been able to"); | |
} | |
else | |
{ | |
Console.Write( | |
$"{++filesSeen}.Length={1.0 / 1024 * content.Length:F1}KB," + | |
// $"{(isJsonDeserializable?(obj.Count() + " json properties"):"")}," + | |
$"{timeLog.Elapsed.TotalSeconds:F1}secs, "); | |
totalSize += content.Length; | |
} | |
if (filesSeen == _99thPercentile) | |
{ | |
time99 = timeLog.Elapsed.TotalSeconds; | |
Console.WriteLine($"{filesSeen} in {time99:F2}secs. {filesSeen / time99:F0} Events per second," + | |
$" {totalSize / 1024 / 1024 / time99:F2} MBytes per second, " + | |
$"average filesize {(totalSize / filesSeen / 1024):F0}KB"); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine($"{e.Message} at file {filesSeen} {e.StackTrace?.Replace("\n", " | ")}"); | |
} | |
Thread.Yield(); | |
} | |
timeLog.Add("DequeueLoop:end"); | |
//--------------------------------------------------------------------------------------------------- | |
//--Report------------------------------------------------------------------------------------------- | |
var totalSeconds = timeLog.Elapsed.TotalSeconds; | |
Console.WriteLine("\n"); | |
if(timeoutCancellation.IsCancellationRequested){Console.WriteLine($"Timed out at {totalSeconds:F2} seconds");} | |
if (filesSeen == 0) | |
{ | |
Console.WriteLine("Got nothing."); | |
} | |
else | |
{ | |
Console.WriteLine($"{filesSeen} in {totalSeconds :F2}secs. {filesSeen/totalSeconds :F0} files per second." + | |
$" {totalSize/1024/1024/totalSeconds :F2} MBytes per second, " + | |
$"average filesize {(totalSize/filesSeen/1024) :F0}KB"); | |
} | |
if (0 < filesSeen && filesSeen < blobCount) | |
{ | |
Console.WriteLine( | |
$"Got {100 * filesSeen/blobCount}% of {blobCount} before timeboxing at {timeoutSecounds} seconds." + | |
(timedout>0 ? $" {timedout} downloads abandoned." :"")); | |
} | |
timeLog.OutdentThenAdd("End"); | |
timeLog.Stop(); | |
Console.WriteLine(timeLog.ToString()); | |
Environment.ExitCode = 0; | |
//--------------------------------------------------------------------------------------------------- | |
public class RecordingStopwatch : Stopwatch | |
{ | |
public RecordingStopwatch SetVerbose(TextWriter output) { _out = output; return this; } | |
public RecordingStopwatch UnsetVerbose() { _out = null; return this; } | |
List<(int Indent, string Event, TimeSpan SinceStart)> Lines = new(); | |
int _currentIndent = 0; | |
TextWriter? _out; | |
public RecordingStopwatch ClearAndStart() | |
{ | |
Lines = new(); | |
Start(); | |
return this; | |
} | |
public RecordingStopwatch Add(string @event) | |
{ | |
Lines.Add( (_currentIndent,@event,Elapsed)); | |
_out?.WriteLine((_currentIndent,@event,Elapsed)); | |
return this; | |
} | |
public RecordingStopwatch Add(string event1, string event2) | |
{ | |
var now = Elapsed; | |
Lines.Add( (_currentIndent,event1,now)); | |
Lines.Add( (_currentIndent,event2,now)); | |
_out?.WriteLine((_currentIndent,event1,now)); | |
_out?.WriteLine((_currentIndent,event2,now)); | |
return this; | |
} | |
public RecordingStopwatch IndentAdd(string @event) | |
{ | |
Lines.Add( ( ++_currentIndent, @event, Elapsed)); | |
_out?.WriteLine((_currentIndent,@event,Elapsed)); | |
return this; | |
} | |
public RecordingStopwatch AddThenIndent(string @event) | |
{ | |
Lines.Add( ( _currentIndent, @event, Elapsed)); | |
_out?.WriteLine((_currentIndent,@event,Elapsed)); | |
_currentIndent++; | |
return this; | |
} | |
public RecordingStopwatch OutdentThenAdd(string @event) | |
{ | |
--_currentIndent; | |
if (_currentIndent < 0) _currentIndent = 0; | |
Lines.Add( ( _currentIndent, @event, Elapsed)); | |
_out?.WriteLine((_currentIndent,@event,Elapsed)); | |
return this; | |
} | |
public RecordingStopwatch AddIndentAdd(string @event, string indentedEvent) | |
{ | |
var now = Elapsed; | |
Lines.Add( ( _currentIndent, @event, now)); | |
Lines.Add( ( ++_currentIndent, @indentedEvent, now)); | |
_out?.WriteLine((_currentIndent,@event,now)); | |
_out?.WriteLine((_currentIndent,indentedEvent,now)); | |
return this; | |
} | |
public RecordingStopwatch AddOutdentAdd(string @event, string outdentedEvent) | |
{ | |
var now = Elapsed; | |
Lines.Add( ( _currentIndent, @event, now)); | |
--_currentIndent; | |
if (_currentIndent < 0) _currentIndent = 0; | |
Lines.Add( ( _currentIndent, outdentedEvent, now)); | |
_out?.WriteLine((_currentIndent,@event,now)); | |
_out?.WriteLine((_currentIndent,outdentedEvent,now)); | |
return this; | |
} | |
public override string ToString() | |
{ | |
return | |
base.ToString() + | |
Environment.NewLine + | |
string.Join('\n', Lines.Select(r => OneRowToString(r))); | |
} | |
public string ToString(string timespanFormat) | |
{ | |
return | |
base.ToString() + | |
Environment.NewLine + | |
string.Join('\n', Lines.Select(r=>OneRowToString(r,timespanFormat))); | |
} | |
public string ToStringMillis(string? doubleFormat=null) | |
{ | |
return | |
base.ToString() + | |
Environment.NewLine + | |
string.Join('\n', Lines.Select(r=>OneRowToStringMillis(r,doubleFormat))); | |
} | |
static string OneRowToString((int Indent, string Event, TimeSpan SinceStart) r, string? timespanFormat=null) | |
{ | |
return (r.Indent > 0 ? new string(' ', r.Indent) : "") + | |
r.Event + " " + r.SinceStart.ToString(timespanFormat); | |
} | |
static string OneRowToStringMillis((int Indent, string Event, TimeSpan SinceStart) r, string? doubleFormat=null) | |
{ | |
return (r.Indent > 0 ? new string(' ', r.Indent) : "") + | |
r.Event + " " + r.SinceStart.TotalMilliseconds.ToString(doubleFormat); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment