Skip to content

Instantly share code, notes, and snippets.

@jrgcubano
Created September 17, 2018 10:57
Show Gist options
  • Save jrgcubano/6801be1875413cf84bc6e215b99741ce to your computer and use it in GitHub Desktop.
Save jrgcubano/6801be1875413cf84bc6e215b99741ce to your computer and use it in GitHub Desktop.
Azure Table Storage Batch Manager (Insert, Delete, Merge, etc) in batches
public class TableStorageBatchManager
{
const int BatchSize = 100;
readonly ConcurrentQueue<Tuple<ITableEntity, TableOperation>> operations;
readonly CloudStorageAccount storageAccount;
readonly string tableName;
public TableStorageBatchManager(string tableName)
{
this.tableName = tableName;
var cs = CloudConfigurationManager.GetSetting("StorageConnectionString");
storageAccount = CloudStorageAccount.Parse(cs);
var tableReference = MakeTableReference();
tableReference.CreateIfNotExists();
operations = new ConcurrentQueue<Tuple<ITableEntity, TableOperation>>();
}
CloudTable MakeTableReference()
{
var tableClient = storageAccount.CreateCloudTableClient();
var tableReference = tableClient.GetTableReference(tableName);
return tableReference;
}
public decimal OutstandingOperations => operations.Count;
public void Insert<TEntity>(TEntity entity)
where TEntity : ITableEntity
{
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.Insert(entity));
operations.Enqueue(e);
}
public void Delete<TEntity>(TEntity entity)
where TEntity : ITableEntity
{
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.Delete(entity));
operations.Enqueue(e);
}
public void InsertOrMerge<TEntity>(TEntity entity)
where TEntity : ITableEntity
{
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.InsertOrMerge(entity));
operations.Enqueue(e);
}
public void InsertOrReplace<TEntity>(TEntity entity)
where TEntity : ITableEntity
{
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.InsertOrReplace(entity));
operations.Enqueue(e);
}
public void Merge<TEntity>(TEntity entity)
where TEntity : ITableEntity
{
var e = new Tuple<ITableEntity, TableOperation>(entity, TableOperation.Merge(entity));
operations.Enqueue(e);
}
public void Replace<TEntity>(TEntity entity)
where TEntity : ITableEntity
{
var e = new Tuple<ITableEntity, TableOperation>(entity,TableOperation.Replace(entity));
operations.Enqueue(e);
}
static OperationContext createOperationContext(string id) =>
new OperationContext { ClientRequestID = id };
static TableRequestOptions MakeTableRequestOptions() =>
new TableRequestOptions
{
RetryPolicy = new ExponentialRetry(TimeSpan.FromMilliseconds(2), 100)
};
public async Task ExecuteAsync(string operationId, CancellationToken cancellationToken = default(CancellationToken))
{
var operationContext = createOperationContext(operationId);
var count = operations.Count;
var toExecute = new List<Tuple<ITableEntity, TableOperation>>();
for (var index = 0; index < count; index++)
{
Tuple<ITableEntity, TableOperation> operation;
operations.TryDequeue(out operation);
if (operation != null)
toExecute.Add(operation);
}
toExecute
.GroupBy(tuple => tuple.Item1.PartitionKey)
.ToList()
.ForEach(async g =>
{
var operations = g.ToList();
var batch = 0;
var operationBatch = GetOperations(operations, batch);
while (operationBatch.Count > 0)
{
var tableBatchOperation = MakeBatchOperation(operationBatch);
await ExecuteBatchWithRetries(tableBatchOperation, operationContext, cancellationToken);
batch++;
operationBatch = GetOperations(operations, batch);
}
});
}
Task ExecuteBatchWithRetries(
TableBatchOperation tableBatchOperation, OperationContext operationContenxt,
CancellationToken cancellationToken = default(CancellationToken))
{
var tableRequestOptions = MakeTableRequestOptions();
var tableReference = MakeTableReference();
return tableReference.ExecuteBatchAsync(tableBatchOperation, tableRequestOptions, operationContenxt, cancellationToken);
}
static TableBatchOperation MakeBatchOperation(
List<Tuple<ITableEntity, TableOperation>> operationsToExecute)
{
var tableBatchOperation = new TableBatchOperation();
operationsToExecute.ForEach(tuple => tableBatchOperation.Add(tuple.Item2));
return tableBatchOperation;
}
static List<Tuple<ITableEntity, TableOperation>> GetOperations(
IEnumerable<Tuple<ITableEntity, TableOperation>> operations,
int batch)
{
return operations
.Skip(batch * BatchSize)
.Take(BatchSize)
.ToList();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment