Skip to content

Instantly share code, notes, and snippets.

@chrisfcarroll
Last active January 29, 2023 19:23
Show Gist options
  • Save chrisfcarroll/ab3b8b28252a3811f71a3f2c0374711f to your computer and use it in GitHub Desktop.
Save chrisfcarroll/ab3b8b28252a3811f71a3f2c0374711f to your computer and use it in GitHub Desktop.
Azure Blob Storage parallel downloads of 1000s files
//
// Dependency: dotnet add package Azure.Storage.Blobs
// #r "nuget:Azure.Storage.Blobs"
//
// Parallel download blobs from an Azure Blob Storage container, and report speed and bandwidth metrics
// As is, this code searches for blobs by index tag. To download by virtual folder instead,
// replace blobItems = containerClient.FindBlobsByTags(indexTagFilter) with
// blobItemsUntagged = containerClient.GetBlobs();
//
using System.Diagnostics;
using System.Dynamic;
using System.Text.Json;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
if (args.Length < 2 || !Uri.TryCreate(args[0],UriKind.Absolute,out var speedTestUrl))
{
Console.WriteLine("\nParallel download blobs from an Azure Blob Storage container, " +
"\nand report speed and bandwidth metrics." +
"\n\nUsage from project directory: " +
"\ndotnet run -- <absoluteStorageUrlWithSaS> <containerName>" +
" [<timeboxSeconds=5>]" +
" [<indexTagFilter=\\\"state\\\"='new']" +
"\n");
Environment.Exit(args.Length==0?0:1);
return;
}
var containerName = args[1];
var timeoutSecounds = args.Length >= 3 && int.TryParse(args[2], out var arg2) && arg2 > 0 ? arg2 : 5;
var indexTagFilter = args.Length >= 4 ? args[3] : "\"state\"='new'";
Console.WriteLine($"Container={containerName}, Timeboxed to {timeoutSecounds} seconds, Filtered on Index Tags {indexTagFilter}");
#pragma warning disable CS0162 //unreachable code
if (true)
{
var parallelism = Environment.ProcessorCount * 8;
Console.WriteLine($"{Environment.ProcessorCount} processors found, use parallelism={parallelism}");
System.Net.ServicePointManager.DefaultConnectionLimit += parallelism;
ThreadPool.SetMinThreads(parallelism, Environment.ProcessorCount);
}
else
{
ThreadPool.GetMinThreads(out var workerThreads, out _);
Console.WriteLine("Using DotNetCore defaults unchanged: " +
$"System.Net.ServicePointManager.DefaultConnectionLimit={System.Net.ServicePointManager.DefaultConnectionLimit}\n" +
$"ThreadPool.MinWorkerThreads={workerThreads}");
}
var throttleParallelDownloadsAt = 1000;
var semaphore = new SemaphoreSlim(throttleParallelDownloadsAt);
var blobServiceClient = new BlobServiceClient(speedTestUrl);
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
var blobItems = containerClient.FindBlobsByTags(indexTagFilter);
//var blobItems = containerClient.GetBlobs();
var blobCount = blobItems.Count();
Console.WriteLine($"{blobCount} blobs found");
var files = new Queue<string>();
var tasks = new List<Task>();
var timedout = 0;
var stopwatch = Stopwatch.StartNew();
var timeoutCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSecounds)).Token;
foreach (var blob in blobItems)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
var blobClient = containerClient.GetBlobClient(blob.BlobName /*blob.Name*/);
// Can do the stream reading manually, but it ain't any better
// using var stream = new MemoryStream();
// await blobClient.DownloadToAsync(stream, timeoutCancellation);
// stream.Position = 0;
// using var streamReader = new StreamReader(stream);
// var result = await streamReader.ReadToEndAsync();
var result = (await blobClient.DownloadContentAsync(timeoutCancellation))
.Value.Content;
if (result is not null)
{
files.Enqueue(result.ToString());
}
}
catch (TaskCanceledException) { timedout++; }
finally
{
semaphore.Release();
}
},timeoutCancellation));
}
int filesSeen = 0;
int totalSize = 0;
double time99;
var _99thPercentile = blobCount * 99 / 100;
while (filesSeen < blobCount && ! timeoutCancellation.IsCancellationRequested)
{
try
{
if (files.TryDequeue(out var file))
{
//var obj = JsonSerializer.Deserialize<ExpandoObject>(file)??new ExpandoObject();
//var isJsonDeserializable = obj.Count()>0;
Console.Write(
$"{++filesSeen}.Length={1.0/1024 * file.Length:F1}KB," +
// $"{(isJsonDeserializable?(obj.Count() + " json properties"):"")}," +
$"{stopwatch.Elapsed.TotalSeconds:F1}secs, ");
totalSize += file.Length;
if (filesSeen == _99thPercentile)
{
time99 = stopwatch.Elapsed.TotalSeconds;
Console.WriteLine($"{filesSeen} in {time99 :F2}secs. {filesSeen/time99 :F0} Events per second," +
$" {totalSize/1024/1024/time99 :F2} MBytes per second, " +
$"average filesize {(totalSize/filesSeen/1024) :F0}KB");
}
}
}
catch (Exception e)
{
Console.WriteLine($"{e.Message} at file {filesSeen} {e.StackTrace?.Replace("\n"," | ")}");
}
Thread.Yield();
}
var time = stopwatch.Elapsed.TotalSeconds;
Console.WriteLine("\n");
if(timeoutCancellation.IsCancellationRequested){Console.WriteLine($"Timed out at {time:F2} seconds");}
if (filesSeen == 0)
{
Console.WriteLine("Got nothing.");
}
else
{
Console.WriteLine($"{filesSeen} in {time :F2}secs. {filesSeen/time :F0} files per second." +
$" {totalSize/1024/1024/time :F2} MBytes per second, " +
$"average filesize {(totalSize/filesSeen/1024) :F0}KB");
}
if (0 < filesSeen && filesSeen < blobCount)
{
Console.WriteLine(
$"Got {100 * filesSeen/blobCount}% of {blobCount} before timeboxing at {timeoutSecounds} seconds." +
(timedout>0 ? $" {timedout} downloads abandoned." :""));
}
await Task.WhenAll(tasks);
//
// Parallel download blobs from an Azure Blob Storage container, and report speed and bandwidth metrics
// As is, this code searches for blobs by index tag. To download by virtual folder instead,
// replace blobItems = containerClient.FindBlobsByTags(indexTagFilter) with
// blobItemsUntagged = containerClient.GetBlobs();
//
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Dynamic;
using System.Globalization;
using System.Text.Json;
using System.Threading.Tasks.Sources;
using Azure.Storage.Blobs;
var timeLog = new RecordingStopwatch().SetVerbose(Console.Out).ClearAndStart().AddThenIndent("Start");
//--Parse Args---------------------------------------------------------------------------------------
if (args.Length < 2 || !Uri.TryCreate(args[0],UriKind.Absolute,out var speedTestUrl))
{
Console.WriteLine("\nParallel download blobs from an Azure Blob Storage container, " +
"\nand report speed and bandwidth metrics." +
"\n\nUsage from project directory: " +
"\ndotnet run -- <absoluteStorageUrlWithSaS> <containerName>" +
" [<timeboxSeconds=5>]" +
" [<indexTagFilter=\\\"state\\\"='new']" +
"\n");
Environment.Exit(args.Length==0?0:1);
return;
}
var containerName = args[1];
var timeoutSecounds = args.Length >= 3 && int.TryParse(args[2], out var arg2) && arg2 > 0 ? arg2 : 5;
var indexTagFilter = args.Length >= 4 ? args[3] : "\"state\"='new'";
Console.WriteLine($"Container={containerName}, Timeboxed to {timeoutSecounds} seconds, Filtered on Index Tags {indexTagFilter}");
timeLog.Add("ParsedArgs");
//---------------------------------------------------------------------------------------------------
//--Maybe set Connection and Thread limits ----------------------------------------------------------------
var parallelism = Environment.ProcessorCount * 8;
#pragma warning disable CS0162 //unreachable code
if (true)
{
Console.WriteLine($"{Environment.ProcessorCount} processors found, use parallelism={parallelism}");
System.Net.ServicePointManager.DefaultConnectionLimit += parallelism;
ThreadPool.SetMinThreads(parallelism, Environment.ProcessorCount);
}
else
{
ThreadPool.GetMinThreads(out var workerThreads, out _);
Console.WriteLine("Using DotNetCore defaults unchanged: " +
$"System.Net.ServicePointManager.DefaultConnectionLimit={System.Net.ServicePointManager.DefaultConnectionLimit}\n" +
$"ThreadPool.MinWorkerThreads={workerThreads}");
}
//---------------------------------------------------------------------------------------------------
//--Init workspace-----------------------------------------------------------------------------------
var files = new ConcurrentQueue<string>();
var downloadTaskSemaphore = new SemaphoreSlim(parallelism); //I think we get more Azure errors if we don't use the throttle
var downloadTasks = new List<Task>();
var blobCount = 0;
var timedout = 0;
//---------------------------------------------------------------------------------------------------
//--FindBlobs----------------------------------------------------------------------------------------
var blobServiceClient = new BlobServiceClient(speedTestUrl);
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
timeLog.Add("FindBlobsByTag:start");
var blobItems = containerClient.FindBlobsByTags(indexTagFilter);
//var blobItems = containerClient.GetBlobs();
timeLog.Add("FindBlobsByTag:end");
//---------------------------------------------------------------------------------------------------
//--Download Files in a loop-------------------------------------------------------------------------
var timeoutCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSecounds)).Token;
timeLog.Add("DownloadLoop:start");
foreach (var blob in blobItems)
{
await downloadTaskSemaphore.WaitAsync();
blobCount++;
downloadTasks.Add(
Task.Run(async () =>
{
try
{
var blobClient = containerClient.GetBlobClient(blob.BlobName /*blob.Name*/);
// Can do the stream reading manually, but it ain't any better
// using var stream = new MemoryStream();
// await blobClient.DownloadToAsync(stream, timeoutCancellation);
// stream.Position = 0;
// using var streamReader = new StreamReader(stream);
// var result = await streamReader.ReadToEndAsync();
var result = (await blobClient.DownloadContentAsync(timeoutCancellation))
.Value.Content;
if (result is not null)
{
files.Enqueue(result.ToString());
}
}
catch (TaskCanceledException) { timedout++; }
finally
{
downloadTaskSemaphore.Release();
}
},timeoutCancellation));
}
timeLog.Add("DownloadLoop:startedall");
Console.WriteLine("Total Blobs Found="+blobCount);
//---------------------------------------------------------------------------------------------------
//--Deserialize or at least Count downloaded files---------------------------------------------------
timeLog.Add("DequeueLoop:start");
int filesSeen = 0;
int totalSize = 0;
double time99;
var _99thPercentile = blobCount * 99 / 100;
while (filesSeen < blobCount && !timeoutCancellation.IsCancellationRequested)
{
try
{
if (files.TryDequeue(out var content))
{
//var obj = JsonSerializer.Deserialize<ExpandoObject>(file)??new ExpandoObject();
//var isJsonDeserializable = obj.Count()>0;
if (content is null)
{
Console.WriteLine("Assertion Failed: Dequeued null but shouldn't have been able to");
}
else
{
Console.Write(
$"{++filesSeen}.Length={1.0 / 1024 * content.Length:F1}KB," +
// $"{(isJsonDeserializable?(obj.Count() + " json properties"):"")}," +
$"{timeLog.Elapsed.TotalSeconds:F1}secs, ");
totalSize += content.Length;
}
if (filesSeen == _99thPercentile)
{
time99 = timeLog.Elapsed.TotalSeconds;
Console.WriteLine($"{filesSeen} in {time99:F2}secs. {filesSeen / time99:F0} Events per second," +
$" {totalSize / 1024 / 1024 / time99:F2} MBytes per second, " +
$"average filesize {(totalSize / filesSeen / 1024):F0}KB");
}
}
}
catch (Exception e)
{
Console.WriteLine($"{e.Message} at file {filesSeen} {e.StackTrace?.Replace("\n", " | ")}");
}
Thread.Yield();
}
timeLog.Add("DequeueLoop:end");
//---------------------------------------------------------------------------------------------------
//--Report-------------------------------------------------------------------------------------------
var totalSeconds = timeLog.Elapsed.TotalSeconds;
Console.WriteLine("\n");
if(timeoutCancellation.IsCancellationRequested){Console.WriteLine($"Timed out at {totalSeconds:F2} seconds");}
if (filesSeen == 0)
{
Console.WriteLine("Got nothing.");
}
else
{
Console.WriteLine($"{filesSeen} in {totalSeconds :F2}secs. {filesSeen/totalSeconds :F0} files per second." +
$" {totalSize/1024/1024/totalSeconds :F2} MBytes per second, " +
$"average filesize {(totalSize/filesSeen/1024) :F0}KB");
}
if (0 < filesSeen && filesSeen < blobCount)
{
Console.WriteLine(
$"Got {100 * filesSeen/blobCount}% of {blobCount} before timeboxing at {timeoutSecounds} seconds." +
(timedout>0 ? $" {timedout} downloads abandoned." :""));
}
timeLog.OutdentThenAdd("End");
timeLog.Stop();
Console.WriteLine(timeLog.ToString());
Environment.ExitCode = 0;
//---------------------------------------------------------------------------------------------------
public class RecordingStopwatch : Stopwatch
{
public RecordingStopwatch SetVerbose(TextWriter output) { _out = output; return this; }
public RecordingStopwatch UnsetVerbose() { _out = null; return this; }
List<(int Indent, string Event, TimeSpan SinceStart)> Lines = new();
int _currentIndent = 0;
TextWriter? _out;
public RecordingStopwatch ClearAndStart()
{
Lines = new();
Start();
return this;
}
public RecordingStopwatch Add(string @event)
{
Lines.Add( (_currentIndent,@event,Elapsed));
_out?.WriteLine((_currentIndent,@event,Elapsed));
return this;
}
public RecordingStopwatch Add(string event1, string event2)
{
var now = Elapsed;
Lines.Add( (_currentIndent,event1,now));
Lines.Add( (_currentIndent,event2,now));
_out?.WriteLine((_currentIndent,event1,now));
_out?.WriteLine((_currentIndent,event2,now));
return this;
}
public RecordingStopwatch IndentAdd(string @event)
{
Lines.Add( ( ++_currentIndent, @event, Elapsed));
_out?.WriteLine((_currentIndent,@event,Elapsed));
return this;
}
public RecordingStopwatch AddThenIndent(string @event)
{
Lines.Add( ( _currentIndent, @event, Elapsed));
_out?.WriteLine((_currentIndent,@event,Elapsed));
_currentIndent++;
return this;
}
public RecordingStopwatch OutdentThenAdd(string @event)
{
--_currentIndent;
if (_currentIndent < 0) _currentIndent = 0;
Lines.Add( ( _currentIndent, @event, Elapsed));
_out?.WriteLine((_currentIndent,@event,Elapsed));
return this;
}
public RecordingStopwatch AddIndentAdd(string @event, string indentedEvent)
{
var now = Elapsed;
Lines.Add( ( _currentIndent, @event, now));
Lines.Add( ( ++_currentIndent, @indentedEvent, now));
_out?.WriteLine((_currentIndent,@event,now));
_out?.WriteLine((_currentIndent,indentedEvent,now));
return this;
}
public RecordingStopwatch AddOutdentAdd(string @event, string outdentedEvent)
{
var now = Elapsed;
Lines.Add( ( _currentIndent, @event, now));
--_currentIndent;
if (_currentIndent < 0) _currentIndent = 0;
Lines.Add( ( _currentIndent, outdentedEvent, now));
_out?.WriteLine((_currentIndent,@event,now));
_out?.WriteLine((_currentIndent,outdentedEvent,now));
return this;
}
public override string ToString()
{
return
base.ToString() +
Environment.NewLine +
string.Join('\n', Lines.Select(r => OneRowToString(r)));
}
public string ToString(string timespanFormat)
{
return
base.ToString() +
Environment.NewLine +
string.Join('\n', Lines.Select(r=>OneRowToString(r,timespanFormat)));
}
public string ToStringMillis(string? doubleFormat=null)
{
return
base.ToString() +
Environment.NewLine +
string.Join('\n', Lines.Select(r=>OneRowToStringMillis(r,doubleFormat)));
}
static string OneRowToString((int Indent, string Event, TimeSpan SinceStart) r, string? timespanFormat=null)
{
return (r.Indent > 0 ? new string(' ', r.Indent) : "") +
r.Event + " " + r.SinceStart.ToString(timespanFormat);
}
static string OneRowToStringMillis((int Indent, string Event, TimeSpan SinceStart) r, string? doubleFormat=null)
{
return (r.Indent > 0 ? new string(' ', r.Indent) : "") +
r.Event + " " + r.SinceStart.TotalMilliseconds.ToString(doubleFormat);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment