Created
September 17, 2018 10:57
-
-
Save jrgcubano/6801be1875413cf84bc6e215b99741ce to your computer and use it in GitHub Desktop.
Azure Table Storage Batch Manager (Insert, Delete, Merge, etc) in batches
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TableStorageBatchManager | |
{ | |
const int BatchSize = 100; | |
readonly ConcurrentQueue<Tuple<ITableEntity, TableOperation>> operations; | |
readonly CloudStorageAccount storageAccount; | |
readonly string tableName; | |
public TableStorageBatchManager(string tableName) | |
{ | |
this.tableName = tableName; | |
var cs = CloudConfigurationManager.GetSetting("StorageConnectionString"); | |
storageAccount = CloudStorageAccount.Parse(cs); | |
var tableReference = MakeTableReference(); | |
tableReference.CreateIfNotExists(); | |
operations = new ConcurrentQueue<Tuple<ITableEntity, TableOperation>>(); | |
} | |
CloudTable MakeTableReference() | |
{ | |
var tableClient = storageAccount.CreateCloudTableClient(); | |
var tableReference = tableClient.GetTableReference(tableName); | |
return tableReference; | |
} | |
public decimal OutstandingOperations => operations.Count; | |
public void Insert<TEntity>(TEntity entity) | |
where TEntity : ITableEntity | |
{ | |
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.Insert(entity)); | |
operations.Enqueue(e); | |
} | |
public void Delete<TEntity>(TEntity entity) | |
where TEntity : ITableEntity | |
{ | |
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.Delete(entity)); | |
operations.Enqueue(e); | |
} | |
public void InsertOrMerge<TEntity>(TEntity entity) | |
where TEntity : ITableEntity | |
{ | |
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.InsertOrMerge(entity)); | |
operations.Enqueue(e); | |
} | |
public void InsertOrReplace<TEntity>(TEntity entity) | |
where TEntity : ITableEntity | |
{ | |
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.InsertOrReplace(entity)); | |
operations.Enqueue(e); | |
} | |
public void Merge<TEntity>(TEntity entity) | |
where TEntity : ITableEntity | |
{ | |
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.Merge(entity)); | |
operations.Enqueue(e); | |
} | |
public void Replace<TEntity>(TEntity entity) | |
where TEntity : ITableEntity | |
{ | |
var e = new Tuple<ITableEntity, TableOperation>(entity,TableOperation.Replace(entity)); | |
operations.Enqueue(e); | |
} | |
static OperationContext createOperationContext(string id) => | |
new OperationContext { ClientRequestID = id }; | |
static TableRequestOptions MakeTableRequestOptions() => | |
new TableRequestOptions | |
{ | |
RetryPolicy = new ExponentialRetry(TimeSpan.FromMilliseconds(2), 100) | |
}; | |
public async Task ExecuteAsync(string operationId, CancellationToken cancellationToken = default(CancellationToken)) | |
{ | |
var operationContext = createOperationContext(operationId); | |
var count = operations.Count; | |
var toExecute = new List<Tuple<ITableEntity, TableOperation>>(); | |
for (var index = 0; index < count; index++) | |
{ | |
Tuple<ITableEntity, TableOperation> operation; | |
operations.TryDequeue(out operation); | |
if (operation != null) | |
toExecute.Add(operation); | |
} | |
toExecute | |
.GroupBy(tuple => tuple.Item1.PartitionKey) | |
.ToList() | |
.ForEach(async g => | |
{ | |
var operations = g.ToList(); | |
var batch = 0; | |
var operationBatch = GetOperations(operations, batch); | |
while (operationBatch.Count > 0) | |
{ | |
var tableBatchOperation = MakeBatchOperation(operationBatch); | |
await ExecuteBatchWithRetries(tableBatchOperation, operationContext, cancellationToken); | |
batch++; | |
operationBatch = GetOperations(operations, batch); | |
} | |
}); | |
} | |
Task ExecuteBatchWithRetries( | |
TableBatchOperation tableBatchOperation, OperationContext operationContenxt, | |
CancellationToken cancellationToken = default(CancellationToken)) | |
{ | |
var tableRequestOptions = MakeTableRequestOptions(); | |
var tableReference = MakeTableReference(); | |
return tableReference.ExecuteBatchAsync(tableBatchOperation, tableRequestOptions, operationContenxt, cancellationToken); | |
} | |
static TableBatchOperation MakeBatchOperation( | |
List<Tuple<ITableEntity, TableOperation>> operationsToExecute) | |
{ | |
var tableBatchOperation = new TableBatchOperation(); | |
operationsToExecute.ForEach(tuple => tableBatchOperation.Add(tuple.Item2)); | |
return tableBatchOperation; | |
} | |
static List<Tuple<ITableEntity, TableOperation>> GetOperations( | |
IEnumerable<Tuple<ITableEntity, TableOperation>> operations, | |
int batch) | |
{ | |
return operations | |
.Skip(batch * BatchSize) | |
.Take(BatchSize) | |
.ToList(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment