Skip to content

Instantly share code, notes, and snippets.

@prabirshrestha
Created July 16, 2012 15:39
Show Gist options
  • Save prabirshrestha/3123395 to your computer and use it in GitHub Desktop.
Save prabirshrestha/3123395 to your computer and use it in GitHub Desktop.
azure blob leaser
#define ASYNC_TARGETTING_PACK
// http://blog.smarx.com/posts/managing-concurrency-in-windows-azure-with-leases
namespace BlobLeaser
{
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.StorageClient;
using Microsoft.WindowsAzure.StorageClient.Protocol;
public class CloudBlobLeaser : IDisposable
{
private const int DefaultTimeoutInSeconds = 90;
public bool HasLease { get { return _leaseId != null; } }
private readonly CloudBlob _blob;
private readonly int _timeoutInSeconds;
private readonly string _leaseId;
private Thread _renewalThread;
private bool _disposed;
public CloudBlobLeaser(CloudBlob blob, int timeoutInSeconds = DefaultTimeoutInSeconds, int renewEverySeconds = 40)
{
_blob = blob;
_timeoutInSeconds = timeoutInSeconds;
//blob.Container.CreateIfNotExist();
try
{
blob.UploadByteArray(new byte[0], new BlobRequestOptions { AccessCondition = AccessCondition.IfNoneMatch("*") });
}
catch (StorageClientException e)
{
if (e.ErrorCode != StorageErrorCode.BlobAlreadyExists
&& e.StatusCode != HttpStatusCode.PreconditionFailed) // 412 from trying to modify a blob that's leased
{
throw;
}
}
_leaseId = TryAcquireLease(blob, timeoutInSeconds);
if (HasLease)
{
_renewalThread = new Thread(async () =>
{
while (true)
{
#if ASYNC_TARGETTING_PACK
await TaskEx.Delay(TimeSpan.FromSeconds(renewEverySeconds));
#else
await Task.Delay(TimeSpan.FromSeconds(renewEverySeconds));
#endif
RenewLease(blob, _leaseId, timeoutInSeconds);
}
});
_renewalThread.Start();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
if (_renewalThread != null)
{
_renewalThread.Abort();
ReleaseLease(_blob, _leaseId, _timeoutInSeconds);
_renewalThread = null;
}
}
_disposed = true;
}
}
~CloudBlobLeaser()
{
Dispose(false);
}
public static string TryAcquireLease(CloudBlob blob, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
try { return AcquireLease(blob, timeoutInSeconds); }
catch (WebException e)
{
if (((HttpWebResponse)e.Response).StatusCode != HttpStatusCode.Conflict) // 409, already leased
{
throw;
}
e.Response.Close();
return null;
}
}
public static string AcquireLease(CloudBlob blob, int timeoutInSeconds)
{
var creds = blob.ServiceClient.Credentials;
var transformedUri = new Uri(creds.TransformUri(blob.Uri.AbsoluteUri));
var req = BlobRequest.Lease(transformedUri,
timeoutInSeconds, // timeout (in seconds)
LeaseAction.Acquire, // as opposed to "break" "release" or "renew"
null); // name of the existing lease, if any
blob.ServiceClient.Credentials.SignRequest(req);
using (var response = req.GetResponse())
{
return response.Headers["x-ms-lease-id"];
}
}
private static void DoLeaseOperation(CloudBlob blob, string leaseId, LeaseAction action, int timeoutInSeconds)
{
var creds = blob.ServiceClient.Credentials;
var transformedUri = new Uri(creds.TransformUri(blob.Uri.AbsoluteUri));
var req = BlobRequest.Lease(transformedUri, timeoutInSeconds, action, leaseId);
creds.SignRequest(req);
req.GetResponse().Close();
}
public static void ReleaseLease(CloudBlob blob, string leaseId, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
DoLeaseOperation(blob, leaseId, LeaseAction.Release, timeoutInSeconds);
}
public static bool TryRenewLease(CloudBlob blob, string leaseId, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
try { RenewLease(blob, leaseId, timeoutInSeconds); return true; }
catch { return false; }
}
public static void RenewLease(CloudBlob blob, string leaseId, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
DoLeaseOperation(blob, leaseId, LeaseAction.Renew, timeoutInSeconds);
}
public static void BreakLease(CloudBlob blob, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
DoLeaseOperation(blob, null, LeaseAction.Break, timeoutInSeconds);
}
// NOTE: This method doesn't do everything that the regular UploadText does.
// Notably, it doesn't update the BlobProperties of the blob (with the new
// ETag and LastModifiedTimeUtc). It also, like all the methods in this file,
// doesn't apply any retry logic. Use this at your own risk!
public static void UploadText(CloudBlob blob, string text, string leaseId, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
string url = blob.Uri.AbsoluteUri;
if (blob.ServiceClient.Credentials.NeedsTransformUri)
{
url = blob.ServiceClient.Credentials.TransformUri(url);
}
var req = BlobRequest.Put(new Uri(blob.ServiceClient.Credentials.TransformUri(blob.Uri.AbsoluteUri)),
timeoutInSeconds, new BlobProperties(), BlobType.BlockBlob, leaseId, 0);
using (var writer = new StreamWriter(req.GetRequestStream()))
{
writer.Write(text);
}
blob.ServiceClient.Credentials.SignRequest(req);
req.GetResponse().Close();
}
public static void SetMetadata(CloudBlob blob, string leaseId, int timeoutInSeconds = DefaultTimeoutInSeconds)
{
var req = BlobRequest.SetMetadata(new Uri(blob.ServiceClient.Credentials.TransformUri(blob.Uri.AbsoluteUri)), timeoutInSeconds, leaseId);
foreach (string key in blob.Metadata.Keys)
{
req.Headers.Add("x-ms-meta-" + key, blob.Metadata[key]);
}
blob.ServiceClient.Credentials.SignRequest(req);
req.GetResponse().Close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment