Created
June 14, 2021 03:27
-
-
Save loic-sharma/38964359a9cabd98f13a5e3e77036a18 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>net5.0</TargetFramework> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="Azure.Storage.Blobs" Version="12.9.0" /> | |
<PackageReference Include="NuGet.Packaging" Version="5.9.1" /> | |
</ItemGroup> | |
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using System.Security.Cryptography; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Azure.Storage.Blobs; | |
using Azure.Storage.Blobs.Models; | |
using NuGet.Packaging; | |
namespace ParallelExtract | |
{ | |
internal class Program | |
{ | |
public static async Task Main(string[] args) | |
{ | |
string connectionString = "..."; | |
string srcContainerName = "..."; | |
string destContainerName = "..."; | |
ThreadPool.SetMinThreads(workerThreads: 32, completionPortThreads: 4); | |
ServicePointManager.DefaultConnectionLimit = 32; | |
ServicePointManager.MaxServicePointIdleTime = 10000; | |
CancellationToken cancellationToken = CancellationToken.None; | |
BlobContainerClient srcClient = new BlobContainerClient(connectionString, srcContainerName); | |
BlobContainerClient destClient = new BlobContainerClient(connectionString, destContainerName); | |
//await PrepareAsync(srcClient); | |
IReadOnlyList<string> blobs = await DiscoverBlobsAsync(srcClient, cancellationToken); | |
Console.WriteLine("Processing files..."); | |
Stopwatch stopwatch = Stopwatch.StartNew(); | |
string destPrefix = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString(); | |
// Uncomment one of these lines: | |
//await Process1Async(srcClient, destClient, destPrefix, blobs, cancellationToken); | |
//await Process2Async(srcClient, destClient, destPrefix, blobs, cancellationToken); | |
//await Process3Async(srcClient, destClient, destPrefix, blobs, cancellationToken); | |
//await Process4Async(srcClient, destClient, destPrefix, blobs, cancellationToken); | |
//await Process5Async(srcClient, destClient, destPrefix, blobs, cancellationToken); | |
//await Process6Async(srcClient, destClient, destPrefix, blobs, cancellationToken); | |
Console.WriteLine($"Processed files in {stopwatch.Elapsed.TotalSeconds} seconds"); | |
} | |
public static async Task PrepareAsync(BlobContainerClient srcClient) | |
{ | |
string[] paths = Directory.GetFiles(@"C:\Users\sharm\.nuget\packages", "*.nupkg", SearchOption.AllDirectories); | |
foreach (string path in paths.Take(100)) | |
{ | |
await srcClient | |
.GetBlobClient(Path.GetFileName(path)) | |
.UploadAsync(path); | |
Console.WriteLine($"Uploaded {Path.GetFileName(path)}"); | |
} | |
} | |
public static async Task<IReadOnlyList<string>> DiscoverBlobsAsync( | |
BlobContainerClient srcClient, | |
CancellationToken cancellationToken) | |
{ | |
Console.WriteLine("Discovering blobs..."); | |
List<string> result = new List<string>(); | |
await foreach (BlobItem blob in srcClient.GetBlobsAsync(cancellationToken: cancellationToken)) | |
{ | |
result.Add(blob.Name); | |
} | |
Console.WriteLine($"Discovered {result.Count} blobs"); | |
return result; | |
} | |
// 187.7183516 | |
public static async Task Process1Async( | |
BlobContainerClient srcClient, | |
BlobContainerClient destClient, | |
string destPrefix, | |
IReadOnlyList<string> packageIds, | |
CancellationToken cancellationToken) | |
{ | |
foreach (string packageId in packageIds) | |
{ | |
Console.WriteLine($"Processing {packageId}..."); | |
using FileStream stream = GetTemporaryFile(); | |
await srcClient.GetBlobClient(packageId).DownloadToAsync(stream, cancellationToken); | |
using PackageArchiveReader reader = new PackageArchiveReader(stream); | |
foreach (string entry in reader.GetFiles()) | |
{ | |
if (Path.GetExtension(entry) == ".dll") | |
{ | |
continue; | |
} | |
string destName = GetDestinationName(destPrefix, packageId, entry); | |
using Stream fileStream = await reader.GetStreamAsync(entry, cancellationToken); | |
await destClient.GetBlobClient(destName).UploadAsync(fileStream, cancellationToken); | |
} | |
Console.WriteLine($"Processed {packageId}"); | |
} | |
} | |
// 20.7424362 seconds | |
// 32.7400848 seconds | |
public static async Task Process2Async( | |
BlobContainerClient srcClient, | |
BlobContainerClient destClient, | |
string destPrefix, | |
IReadOnlyList<string> packageIds, | |
CancellationToken cancellationToken) | |
{ | |
ConcurrentBag<string> work = new ConcurrentBag<string>(packageIds); | |
IEnumerable<Task> tasks = Enumerable | |
.Repeat(0, 32) | |
.Select(async _ => | |
{ | |
while (work.TryTake(out string packageId)) | |
{ | |
Console.WriteLine($"Processing {packageId}..."); | |
using FileStream stream = GetTemporaryFile(); | |
await srcClient.GetBlobClient(packageId).DownloadToAsync(stream, cancellationToken); | |
using PackageArchiveReader reader = new PackageArchiveReader(stream); | |
foreach (string entry in reader.GetFiles()) | |
{ | |
if (Path.GetExtension(entry) == ".dll") | |
{ | |
continue; | |
} | |
string destName = GetDestinationName(destPrefix, packageId, entry); | |
using Stream fileStream = await reader.GetStreamAsync(entry, cancellationToken); | |
await destClient.GetBlobClient(destName).UploadAsync(fileStream, cancellationToken); | |
} | |
Console.WriteLine($"Processed {packageId}"); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
} | |
// 15.8836806 seconds | |
// 16.2033767 seconds | |
// 22.5380455 seconds | |
public static async Task Process3Async( | |
BlobContainerClient srcClient, | |
BlobContainerClient destClient, | |
string destPrefix, | |
IReadOnlyList<string> blobs, | |
CancellationToken cancellationToken) | |
{ | |
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs); | |
IEnumerable<Task> tasks = Enumerable | |
.Repeat(0, 20) | |
.Select(async _ => | |
{ | |
while (work.TryTake(out string blob)) | |
{ | |
Console.WriteLine($"Processing {blob}..."); | |
using FileStream stream = GetTemporaryFile(); | |
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken); | |
using PackageArchiveReader reader = new PackageArchiveReader(stream); | |
Dictionary<string, Stream> files = new(); | |
foreach (string file in await reader.GetFilesAsync(cancellationToken)) | |
{ | |
if (Path.GetExtension(file) == ".dll") | |
{ | |
continue; | |
} | |
Stream memoryStream = new MemoryStream(); | |
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken); | |
await entryStream.CopyToAsync(memoryStream); | |
memoryStream.Position = 0; | |
files.Add(file, memoryStream); | |
} | |
IEnumerable<Task> tasks = files | |
.Select(async file => | |
{ | |
string destName = GetDestinationName(destPrefix, blob, file.Key); | |
await destClient.GetBlobClient(destName).UploadAsync( | |
file.Value, | |
overwrite: true, | |
cancellationToken); | |
}); | |
await Task.WhenAll(tasks); | |
foreach (Stream entryStream in files.Values) | |
{ | |
stream.Dispose(); | |
} | |
Console.WriteLine($"Processed {blob}"); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
} | |
// 11.0674153 seconds | |
// 11.0072607 seconds | |
// 9.0030523 seconds | |
// 9.4962107 seconds | |
public static async Task Process4Async( | |
BlobContainerClient srcClient, | |
BlobContainerClient destClient, | |
string destPrefix, | |
IReadOnlyList<string> blobs, | |
CancellationToken cancellationToken) | |
{ | |
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs); | |
IEnumerable<Task> tasks = Enumerable | |
.Repeat(0, 20) | |
.Select(async _ => | |
{ | |
while (work.TryTake(out string blob)) | |
{ | |
Console.WriteLine($"Processing {blob}..."); | |
using FileStream stream = GetTemporaryFile(); | |
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken); | |
using PackageArchiveReader reader = new PackageArchiveReader(stream); | |
Dictionary<string, Stream> files = new(); | |
foreach (string file in await reader.GetFilesAsync(cancellationToken)) | |
{ | |
if (Path.GetExtension(file) == ".dll") | |
{ | |
continue; | |
} | |
Stream tempStream = GetTemporaryFile(); | |
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken); | |
await entryStream.CopyToAsync(tempStream); | |
tempStream.Position = 0; | |
files.Add(file, tempStream); | |
} | |
IEnumerable<Task> tasks = files | |
.Select(async file => | |
{ | |
string destName = GetDestinationName(destPrefix, blob, file.Key); | |
await destClient.GetBlobClient(destName).UploadAsync( | |
file.Value, | |
overwrite: true, | |
cancellationToken); | |
}); | |
await Task.WhenAll(tasks); | |
foreach (Stream entryStream in files.Values) | |
{ | |
stream.Dispose(); | |
} | |
Console.WriteLine($"Processed {blob}"); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
} | |
// 12.0935026 seconds | |
// 14.6035567 seconds | |
// 12.0195229 seconds | |
public static async Task Process5Async( | |
BlobContainerClient srcClient, | |
BlobContainerClient destClient, | |
string destPrefix, | |
IReadOnlyList<string> blobs, | |
CancellationToken cancellationToken) | |
{ | |
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs); | |
IEnumerable<Task> tasks = Enumerable | |
.Repeat(0, 20) | |
.Select(async _ => | |
{ | |
while (work.TryTake(out string blob)) | |
{ | |
Console.WriteLine($"Processing {blob}..."); | |
using FileStream stream = GetTemporaryFile(); | |
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken); | |
using PackageArchiveReader reader = new PackageArchiveReader(stream); | |
Dictionary<string, Stream> files = new(); | |
foreach (string file in await reader.GetFilesAsync(cancellationToken)) | |
{ | |
if (Path.GetExtension(file) == ".dll") | |
{ | |
continue; | |
} | |
Stream tempStream = GetTemporaryFile(); | |
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken); | |
await entryStream.CopyToAsync(tempStream); | |
tempStream.Position = 0; | |
files.Add(file, tempStream); | |
} | |
ConcurrentBag<KeyValuePair<string, Stream>> work = new(files); | |
IEnumerable<Task> tasks = Enumerable | |
.Repeat(0, 32) | |
.Select(async _ => | |
{ | |
while (work.TryTake(out KeyValuePair<string, Stream> file)) | |
{ | |
string destName = GetDestinationName(destPrefix, blob, file.Key); | |
await destClient.GetBlobClient(destName).UploadAsync( | |
file.Value, | |
overwrite: true, | |
cancellationToken); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
foreach (Stream entryStream in files.Values) | |
{ | |
stream.Dispose(); | |
} | |
Console.WriteLine($"Processed {blob}"); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
} | |
// 15.5228722 seconds | |
// 14.7717734 seconds | |
// 10.2563626 seconds | |
// 12.1122038 seconds | |
public static async Task Process6Async( | |
BlobContainerClient srcClient, | |
BlobContainerClient destClient, | |
string destPrefix, | |
IReadOnlyList<string> blobs, | |
CancellationToken cancellationToken) | |
{ | |
using SemaphoreSlim semaphore = new SemaphoreSlim(32); | |
ConcurrentBag<string> work = new ConcurrentBag<string>(blobs); | |
IEnumerable<Task> tasks = Enumerable | |
.Repeat(0, 20) | |
.Select(async _ => | |
{ | |
while (work.TryTake(out string blob)) | |
{ | |
Console.WriteLine($"Processing {blob}..."); | |
using FileStream stream = GetTemporaryFile(); | |
await srcClient.GetBlobClient(blob).DownloadToAsync(stream, cancellationToken); | |
using PackageArchiveReader reader = new PackageArchiveReader(stream); | |
Dictionary<string, Stream> files = new(); | |
foreach (string file in await reader.GetFilesAsync(cancellationToken)) | |
{ | |
if (Path.GetExtension(file) == ".dll") | |
{ | |
continue; | |
} | |
Stream tempStream = GetTemporaryFile(); | |
Stream entryStream = await reader.GetStreamAsync(file, cancellationToken); | |
await entryStream.CopyToAsync(tempStream); | |
tempStream.Position = 0; | |
files.Add(file, tempStream); | |
} | |
IEnumerable<Task> tasks = files | |
.Select(async file => | |
{ | |
await semaphore.WaitAsync(cancellationToken); | |
try | |
{ | |
string destName = GetDestinationName(destPrefix, blob, file.Key); | |
await destClient.GetBlobClient(destName).UploadAsync( | |
file.Value, | |
overwrite: true, | |
cancellationToken); | |
} | |
finally | |
{ | |
semaphore.Release(); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
foreach (Stream entryStream in files.Values) | |
{ | |
stream.Dispose(); | |
} | |
Console.WriteLine($"Processed {blob}"); | |
} | |
}); | |
await Task.WhenAll(tasks); | |
} | |
public static string GetDestinationName(string destPrefix, string packageId, string file) | |
{ | |
string fileName = Base64UrlEncode(SHA256.HashData(Encoding.UTF8.GetBytes(packageId + "/" + file))); | |
return $"{destPrefix}/{fileName}"; | |
} | |
private static string Base64UrlEncode(byte[] bytes) | |
{ | |
// See: https://stackoverflow.com/a/59660802 | |
return Convert.ToBase64String(bytes) | |
.Replace('+', '-') // replace URL unsafe characters with safe ones | |
.Replace('/', '_') // replace URL unsafe characters with safe ones | |
.Replace("=", ""); // no padding | |
} | |
/// <summary> | |
/// The buffer size to use for file operations. | |
/// </summary> | |
/// <remarks> | |
/// The value is chosen to align with the default Stream buffer size: | |
/// https://github.com/dotnet/corefx/blob/master/src/Common/src/CoreLib/System/IO/Stream.cs#L32-L35 | |
/// </remarks> | |
private const int BufferSize = 80 * 1024; | |
public static FileStream GetTemporaryFile() | |
{ | |
return new FileStream( | |
Path.GetTempFileName(), | |
FileMode.Create, | |
FileAccess.ReadWrite, | |
FileShare.None, | |
BufferSize, | |
FileOptions.DeleteOnClose | FileOptions.Asynchronous); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment