Skip to content

Instantly share code, notes, and snippets.

@loic-sharma
Created June 14, 2021 03:27
Show Gist options
  • Save loic-sharma/38964359a9cabd98f13a5e3e77036a18 to your computer and use it in GitHub Desktop.
Save loic-sharma/38964359a9cabd98f13a5e3e77036a18 to your computer and use it in GitHub Desktop.
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.9.0" />
<PackageReference Include="NuGet.Packaging" Version="5.9.1" />
</ItemGroup>
</Project>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using NuGet.Packaging;
namespace ParallelExtract
{
internal class Program
{
public static async Task Main(string[] args)
{
string connectionString = "...";
string srcContainerName = "...";
string destContainerName = "...";
ThreadPool.SetMinThreads(workerThreads: 32, completionPortThreads: 4);
ServicePointManager.DefaultConnectionLimit = 32;
ServicePointManager.MaxServicePointIdleTime = 10000;
CancellationToken cancellationToken = CancellationToken.None;
BlobContainerClient srcClient = new BlobContainerClient(connectionString, srcContainerName);
BlobContainerClient destClient = new BlobContainerClient(connectionString, destContainerName);
//await PrepareAsync(srcClient);
IReadOnlyList<string> blobs = await DiscoverBlobsAsync(srcClient, cancellationToken);
Console.WriteLine("Processing files...");
Stopwatch stopwatch = Stopwatch.StartNew();
string destPrefix = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString();
// Uncomment one of these lines:
//await Process1Async(srcClient, destClient, destPrefix, blobs, cancellationToken);
//await Process2Async(srcClient, destClient, destPrefix, blobs, cancellationToken);
//await Process3Async(srcClient, destClient, destPrefix, blobs, cancellationToken);
//await Process4Async(srcClient, destClient, destPrefix, blobs, cancellationToken);
//await Process5Async(srcClient, destClient, destPrefix, blobs, cancellationToken);
//await Process6Async(srcClient, destClient, destPrefix, blobs, cancellationToken);
Console.WriteLine($"Processed files in {stopwatch.Elapsed.TotalSeconds} seconds");
}
public static async Task PrepareAsync(BlobContainerClient srcClient)
{
string[] paths = Directory.GetFiles(@"C:\Users\sharm\.nuget\packages", "*.nupkg", SearchOption.AllDirectories);
foreach (string path in paths.Take(100))
{
await srcClient
.GetBlobClient(Path.GetFileName(path))
.UploadAsync(path);
Console.WriteLine($"Uploaded {Path.GetFileName(path)}");
}
}
public static async Task<IReadOnlyList<string>> DiscoverBlobsAsync(
BlobContainerClient srcClient,
CancellationToken cancellationToken)
{
Console.WriteLine("Discovering blobs...");
List<string> result = new List<string>();
await foreach (BlobItem blob in srcClient.GetBlobsAsync(cancellationToken: cancellationToken))
{
result.Add(blob.Name);
}
Console.WriteLine($"Discovered {result.Count} blobs");
return result;
}
// 187.7183516
public static async Task Process1Async(
BlobContainerClient srcClient,
BlobContainerClient destClient,
string destPrefix,
IReadOnlyList<string> packageIds,
CancellationToken cancellationToken)
{
foreach (string packageId in packageIds)
{
Console.WriteLine($"Processing {packageId}...");
using FileStream stream = GetTemporaryFile();
await srcClient.GetBlobClient(packageId).DownloadToAsync(stream, cancellationToken);
using PackageArchiveReader reader = new PackageArchiveReader(stream);
foreach (string entry in reader.GetFiles())
{
if (Path.GetExtension(entry) == ".dll")
{
continue;
}
string destName = GetDestinationName(destPrefix, packageId, entry);
using Stream fileStream = await reader.GetStreamAsync(entry, cancellationToken);
await destClient.GetBlobClient(destName).UploadAsync(fileStream, cancellationToken);
}
Console.WriteLine($"Processed {packageId}");
}
}
// 20.7424362 seconds
// 32.7400848 seconds
public static async Task Process2Async(
BlobContainerClient srcClient,
BlobContainerClient destClient,
string destPrefix,
IReadOnlyList<string> packageIds,
CancellationToken cancellationToken)
{
ConcurrentBag<string> work = new ConcurrentBag<string>(packageIds);
IEnumerable<Task> tasks = Enumerable
.Repeat(0, 32)
.Select(async _ =>
{
while (work.TryTake(out string packageId))
{
Console.WriteLine($"Processing {packageId}...");
using FileStream stream = GetTemporaryFile();
await srcClient.GetBlobClient(packageId).DownloadToAsync(stream, cancellationToken);
using PackageArchiveReader reader = new PackageArchiveReader(stream);
foreach (string entry in reader.GetFiles())
{
if (Path.GetExtension(entry) == ".dll")
{
continue;
}
string destName = GetDestinationName(destPrefix, packageId, entry);
using Stream fileStream = await reader.GetStreamAsync(entry, cancellationToken);
await destClient.GetBlobClient(destName).UploadAsync(fileStream, cancellationToken);
}
Console.WriteLine($"Processed {packageId}");
}
});
await Task.WhenAll(tasks);
}
// 15.8836806 seconds
// 16.2033767 seconds
// 22.5380455 seconds
public static async Task Process3Async(
BlobContainerClient srcClient,
BlobContainerClient destClient,
string destPrefix,
IReadOnlyList<string> blobs,
CancellationToken cancellationToken)
{
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs);
IEnumerable<Task> tasks = Enumerable
.Repeat(0, 20)
.Select(async _ =>
{
while (work.TryTake(out string blob))
{
Console.WriteLine($"Processing {blob}...");
using FileStream stream = GetTemporaryFile();
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken);
using PackageArchiveReader reader = new PackageArchiveReader(stream);
Dictionary<string, Stream> files = new();
foreach (string file in await reader.GetFilesAsync(cancellationToken))
{
if (Path.GetExtension(file) == ".dll")
{
continue;
}
Stream memoryStream = new MemoryStream();
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken);
await entryStream.CopyToAsync(memoryStream);
memoryStream.Position = 0;
files.Add(file, memoryStream);
}
IEnumerable<Task> tasks = files
.Select(async file =>
{
string destName = GetDestinationName(destPrefix, blob, file.Key);
await destClient.GetBlobClient(destName).UploadAsync(
file.Value,
overwrite: true,
cancellationToken);
});
await Task.WhenAll(tasks);
foreach (Stream entryStream in files.Values)
{
stream.Dispose();
}
Console.WriteLine($"Processed {blob}");
}
});
await Task.WhenAll(tasks);
}
// 11.0674153 seconds
// 11.0072607 seconds
// 9.0030523 seconds
// 9.4962107 seconds
public static async Task Process4Async(
BlobContainerClient srcClient,
BlobContainerClient destClient,
string destPrefix,
IReadOnlyList<string> blobs,
CancellationToken cancellationToken)
{
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs);
IEnumerable<Task> tasks = Enumerable
.Repeat(0, 20)
.Select(async _ =>
{
while (work.TryTake(out string blob))
{
Console.WriteLine($"Processing {blob}...");
using FileStream stream = GetTemporaryFile();
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken);
using PackageArchiveReader reader = new PackageArchiveReader(stream);
Dictionary<string, Stream> files = new();
foreach (string file in await reader.GetFilesAsync(cancellationToken))
{
if (Path.GetExtension(file) == ".dll")
{
continue;
}
Stream tempStream = GetTemporaryFile();
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken);
await entryStream.CopyToAsync(tempStream);
tempStream.Position = 0;
files.Add(file, tempStream);
}
IEnumerable<Task> tasks = files
.Select(async file =>
{
string destName = GetDestinationName(destPrefix, blob, file.Key);
await destClient.GetBlobClient(destName).UploadAsync(
file.Value,
overwrite: true,
cancellationToken);
});
await Task.WhenAll(tasks);
foreach (Stream entryStream in files.Values)
{
stream.Dispose();
}
Console.WriteLine($"Processed {blob}");
}
});
await Task.WhenAll(tasks);
}
// 12.0935026 seconds
// 14.6035567 seconds
// 12.0195229 seconds
public static async Task Process5Async(
BlobContainerClient srcClient,
BlobContainerClient destClient,
string destPrefix,
IReadOnlyList<string> blobs,
CancellationToken cancellationToken)
{
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs);
IEnumerable<Task> tasks = Enumerable
.Repeat(0, 20)
.Select(async _ =>
{
while (work.TryTake(out string blob))
{
Console.WriteLine($"Processing {blob}...");
using FileStream stream = GetTemporaryFile();
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken);
using PackageArchiveReader reader = new PackageArchiveReader(stream);
Dictionary<string, Stream> files = new();
foreach (string file in await reader.GetFilesAsync(cancellationToken))
{
if (Path.GetExtension(file) == ".dll")
{
continue;
}
Stream tempStream = GetTemporaryFile();
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken);
await entryStream.CopyToAsync(tempStream);
tempStream.Position = 0;
files.Add(file, tempStream);
}
ConcurrentBag<KeyValuePair<string, Stream>> work = new(files);
IEnumerable<Task> tasks = Enumerable
.Repeat(0, 32)
.Select(async _ =>
{
while (work.TryTake(out KeyValuePair<string, Stream> file))
{
string destName = GetDestinationName(destPrefix, blob, file.Key);
await destClient.GetBlobClient(destName).UploadAsync(
file.Value,
overwrite: true,
cancellationToken);
}
});
await Task.WhenAll(tasks);
foreach (Stream entryStream in files.Values)
{
stream.Dispose();
}
Console.WriteLine($"Processed {blob}");
}
});
await Task.WhenAll(tasks);
}
// 15.5228722 seconds
// 14.7717734 seconds
// 10.2563626 seconds
// 12.1122038 seconds
public static async Task Process6Async(
BlobContainerClient srcClient,
BlobContainerClient destClient,
string destPrefix,
IReadOnlyList<string> blobs,
CancellationToken cancellationToken)
{
using SemaphoreSlim semaphore = new SemaphoreSlim(32);
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs);
IEnumerable<Task> tasks = Enumerable
.Repeat(0, 20)
.Select(async _ =>
{
while (work.TryTake(out string blob))
{
Console.WriteLine($"Processing {blob}...");
using FileStream stream = GetTemporaryFile();
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken);
using PackageArchiveReader reader = new PackageArchiveReader(stream);
Dictionary<string, Stream> files = new();
foreach (string file in await reader.GetFilesAsync(cancellationToken))
{
if (Path.GetExtension(file) == ".dll")
{
continue;
}
Stream tempStream = GetTemporaryFile();
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken);
await entryStream.CopyToAsync(tempStream);
tempStream.Position = 0;
files.Add(file, tempStream);
}
IEnumerable<Task> tasks = files
.Select(async file =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
string destName = GetDestinationName(destPrefix, blob, file.Key);
await destClient.GetBlobClient(destName).UploadAsync(
file.Value,
overwrite: true,
cancellationToken);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
foreach (Stream entryStream in files.Values)
{
stream.Dispose();
}
Console.WriteLine($"Processed {blob}");
}
});
await Task.WhenAll(tasks);
}
public static string GetDestinationName(string destPrefix, string packageId, string file)
{
string fileName = Base64UrlEncode(SHA256.HashData(Encoding.UTF8.GetBytes(packageId + "/" + file)));
return $"{destPrefix}/{fileName}";
}
private static string Base64UrlEncode(byte[] bytes)
{
// See: https://stackoverflow.com/a/59660802
return Convert.ToBase64String(bytes)
.Replace('+', '-') // replace URL unsafe characters with safe ones
.Replace('/', '_') // replace URL unsafe characters with safe ones
.Replace("=", ""); // no padding
}
/// <summary>
/// The buffer size to use for file operations.
/// </summary>
/// <remarks>
/// The value is chosen to align with the default Stream buffer size:
/// https://github.com/dotnet/corefx/blob/master/src/Common/src/CoreLib/System/IO/Stream.cs#L32-L35
/// </remarks>
private const int BufferSize = 80 * 1024;
public static FileStream GetTemporaryFile()
{
return new FileStream(
Path.GetTempFileName(),
FileMode.Create,
FileAccess.ReadWrite,
FileShare.None,
BufferSize,
FileOptions.DeleteOnClose | FileOptions.Asynchronous);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment