Last active
July 26, 2024 04:39
-
-
Save odinserj/4a3bf40606c4da9183588a5a325dfb99 to your computer and use it in GitHub Desktop.
MutexAttribute.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using Hangfire.Common; | |
using Hangfire.States; | |
using Hangfire.Storage; | |
namespace Hangfire.Pro | |
{ | |
/// <summary> | |
/// Represents a background job filter that helps to disable concurrent execution | |
/// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>. | |
/// </summary> | |
public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter | |
{ | |
private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1); | |
private readonly string _resource; | |
public MutexAttribute(string resource) | |
{ | |
_resource = resource; | |
RetryInSeconds = 15; | |
} | |
public int RetryInSeconds { get; set; } | |
public int MaxAttempts { get; set; } | |
public void OnStateElection(ElectStateContext context) | |
{ | |
// We are intercepting transitions to the Processed state, that is performed by | |
// a worker just before processing a job. During the state election phase we can | |
// change the target state to another one, causing a worker not to process the | |
// backgorund job. | |
if (context.CandidateState.Name != ProcessingState.StateName || | |
context.BackgroundJob.Job == null) | |
{ | |
return; | |
} | |
// This filter requires an extended set of storage operations. It's supported | |
// by all the official storages, and many of the community-based ones. | |
var storageConnection = context.Connection as JobStorageConnection; | |
if (storageConnection == null) | |
{ | |
throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version."); | |
} | |
string blockedBy; | |
try | |
{ | |
// Distributed lock is needed here only to prevent a race condition, when another | |
// worker picks up a background job with the same resource between GET and SET | |
// operations. | |
// There will be no race condition, when two or more workers pick up background job | |
// with the same id, because state transitions are protected with distributed lock | |
// themselves. | |
using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args)) | |
{ | |
// Resource set contains a background job id that acquired a mutex for the resource. | |
// We are getting only one element to see what background job blocked the invocation. | |
var range = storageConnection.GetRangeFromSet( | |
GetResourceKey(context.BackgroundJob.Job.Args), | |
0, | |
0); | |
blockedBy = range.Count > 0 ? range[0] : null; | |
// We should permit an invocation only when the set is empty, or if current background | |
// job is already owns a resource. This may happen, when the localTransaction succeeded, | |
// but outer transaction was failed. | |
if (blockedBy == null || blockedBy == context.BackgroundJob.Id) | |
{ | |
// We need to commit the changes inside a distributed lock, otherwise it's | |
// useless. So we create a local transaction instead of using the | |
// context.Transaction property. | |
var localTransaction = context.Connection.CreateWriteTransaction(); | |
// Add the current background job identifier to a resource set. This means | |
// that resource is owned by the current background job. Identifier will be | |
// removed only on failed state, or in one of final states (succeeded or | |
// deleted). | |
localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id); | |
localTransaction.Commit(); | |
// Invocation is permitted, and we did all the required things. | |
return; | |
} | |
} | |
} | |
catch (DistributedLockTimeoutException) | |
{ | |
// We weren't able to acquire a distributed lock within a specified window. This may | |
// be caused by network delays, storage outages or abandoned locks in some storages. | |
// Since it is required to expire abandoned locks after some time, we can simply | |
// postpone the invocation. | |
context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds)) | |
{ | |
Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded" | |
}; | |
return; | |
} | |
// Background job execution is blocked. We should change the target state either to | |
// the Scheduled or to the Deleted one, depending on current retry attempt number. | |
var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1; | |
context.SetJobParameter("MutexAttempt", currentAttempt); | |
context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts | |
? CreateScheduledState(blockedBy, currentAttempt) | |
: CreateDeletedState(blockedBy); | |
} | |
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) | |
{ | |
if (context.BackgroundJob.Job == null) return; | |
if (context.OldStateName == ProcessingState.StateName) | |
{ | |
using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args)) | |
{ | |
var localTransaction = context.Connection.CreateWriteTransaction(); | |
localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id); | |
localTransaction.Commit(); | |
} | |
} | |
} | |
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction) | |
{ | |
} | |
private static DeletedState CreateDeletedState(string blockedBy) | |
{ | |
return new DeletedState | |
{ | |
Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted" | |
}; | |
} | |
private IState CreateScheduledState(string blockedBy, int currentAttempt) | |
{ | |
var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}"; | |
if (MaxAttempts > 0) | |
{ | |
reason += $"/{MaxAttempts}"; | |
} | |
return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds)) | |
{ | |
Reason = reason | |
}; | |
} | |
private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args) | |
{ | |
return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout); | |
} | |
private string GetDistributedLockKey(IEnumerable<object> args) | |
{ | |
return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}"; | |
} | |
private string GetResourceKey(IEnumerable<object> args) | |
{ | |
return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}"; | |
} | |
private static string GetKeyFormat(IEnumerable<object> args, string keyFormat) | |
{ | |
return String.Format(keyFormat, args.ToArray()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@odinserj Can we use this in the non pro version? Is there something that is specific to pro only?