-
-
Save odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e to your computer and use it in GitHub Desktop.
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice) | |
// | |
// Permission to use, copy, modify, and/or distribute this software for any purpose | |
// with or without fee is hereby granted. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | |
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY | |
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, | |
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS | |
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER | |
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF | |
// THIS SOFTWARE. | |
// Hangfire.Core 1.8+ is required, for previous versions please see revision from year 2022. | |
using System; | |
using System.Collections.Generic; | |
using Hangfire.Client; | |
using Hangfire.Common; | |
using Hangfire.States; | |
using Hangfire.Storage; | |
namespace ConsoleApp28 | |
{ | |
public class SkipWhenPreviousJobIsRunningAttribute : JobFilterAttribute, IClientFilter, IApplyStateFilter | |
{ | |
public void OnCreating(CreatingContext context) | |
{ | |
// We can't handle old storages | |
if (!(context.Connection is JobStorageConnection connection)) return; | |
// We should run this filter only for background jobs based on | |
// recurring ones | |
if (!context.Parameters.TryGetValue("RecurringJobId", out var parameter)) return; | |
var recurringJobId = parameter as string; | |
// RecurringJobId is malformed. This should not happen, but anyway. | |
if (String.IsNullOrWhiteSpace(recurringJobId)) return; | |
var running = connection.GetValueFromHash($"recurring-job:{recurringJobId}", "Running"); | |
if ("yes".Equals(running, StringComparison.OrdinalIgnoreCase)) | |
{ | |
context.Canceled = true; | |
} | |
} | |
public void OnCreated(CreatedContext filterContext) | |
{ | |
} | |
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) | |
{ | |
if (context.NewState is EnqueuedState) | |
{ | |
ChangeRunningState(context, "yes"); | |
} | |
else if ((context.NewState.IsFinal && !FailedState.StateName.Equals(context.OldStateName, StringComparison.OrdinalIgnoreCase)) || | |
(context.NewState is FailedState)) | |
{ | |
ChangeRunningState(context, "no"); | |
} | |
} | |
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction) | |
{ | |
} | |
private static void ChangeRunningState(ApplyStateContext context, string state) | |
{ | |
// We can't handle old storages | |
if (!(context.Connection is JobStorageConnection connection)) return; | |
// Obtaining a recurring job identifier | |
var recurringJobId = context.GetJobParameter<string>("RecurringJobId", allowStale: true); | |
if (String.IsNullOrWhiteSpace(recurringJobId)) return; | |
if (context.Storage.HasFeature(JobStorageFeatures.Transaction.AcquireDistributedLock)) | |
{ | |
// Acquire a lock in newer storages to avoid race conditions | |
((JobStorageTransaction)context.Transaction).AcquireDistributedLock( | |
$"lock:recurring-job:{recurringJobId}", | |
TimeSpan.FromSeconds(5)); | |
} | |
// Checking whether recurring job exists | |
var recurringJob = connection.GetValueFromHash($"recurring-job:{recurringJobId}", "Job"); | |
if (String.IsNullOrEmpty(recurringJob)) return; | |
// Changing the running state | |
context.Transaction.SetRangeInHash( | |
$"recurring-job:{recurringJobId}", | |
new[] { new KeyValuePair<string, string>("Running", state) }); | |
} | |
} | |
} |
https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e#file-skipwhenpreviousjobisrunningattribute-cs-L50 is also invalid now that IsFinal is false for FailedState unless there is bug in the way it is coded.
I have found a bug. When my job failed, it never runs again. The bug in line 28: var running = connection.GetValueFromHash($"recurring-job:{recurringJobId}", "Running");
it always returns yes
. If I go to the queue, there are no running job but failed.
P.S. My recurringJobId
is always the same for every new job.
Does anyone know how to fix it?
Ok, I found a solution.
The statement in line 50 needs to be uncommented:
else if (context.NewState.IsFinal || context.NewState is FailedState)
Update Mar 15, 2022: The fix is not relevant anymore. See https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e/revisions#diff-33f281c89583191f21e641bd642f6a530b26f6d037f0f8637b43de1d5ab83937R50
How can I use this on background enqueued jobs? This SkipWhenPreviousJobIsRunningAttribute seems specific to recurring jobs, I would like the current event driven job to complete before the next one can start even though it has been triggered.
I would like to hear the solution for a background job too! Now I have bumped into the issue that in 30 mins, while background job is still running, the same job is being run again! And I cannot see any setting with this "magic" 30 minutes...
It looks like you have to buy a subscription to get the Hangfire.Ace extensibility set to use the Hangfire.Throttling
package. See Concurrency & Rate Limiting.
How can I use this on background enqueued jobs? This SkipWhenPreviousJobIsRunningAttribute seems specific to recurring jobs, I would like the current event driven job to complete before the next one can start even though it has been triggered.
You could make a named queue with a size of 1.
app.UseHangfireServer(new BackgroundJobServerOptions
{
WorkerCount = 1,
Queues = new[] { "queueName" }
});
Then apply [Queue("queueName")]
to your job.
CAUTION: If you have two servers running each will start a queue can process a maximum of one job. So if you just want one per server then this will work. If you really cannot have two at a time for some business reason then this won't work with multiple servers.
@simeyla good idea. I am just not clear on what you intended to say with you "CAUTION" note from above.I have tested and your approch seems to be actually working fine when there are multiple servers where one of them is linked to a queue that is also used to enqueue a background job. I have tested by enqueuing several jobs and all of them are consistently being executed one by one by the server that has been linked to the queue used to enqueue each of the background jobs.
Am I missing your something?
Thanks!
hello there, I have a problem with this code. After some time (cannot really say what is some) some of my recurring jobs which are decorated with this attribute are stopping execution. I mean they are immediately canceled after new execution and when I placed logger there here is what it produces:
2023-11-28 16:05:33.7064|INFO|Namespace.Jobs.Hangfire.Attributes.SkipWhenPreviousJobIsRunningAttribute|running: yes
2023-11-28 16:05:33.7064|INFO|Namespace.Jobs.Hangfire.Attributes.SkipWhenPreviousJobIsRunningAttribute|Job will be canceled
and here is code in C#
var logger = new NLogFactory().Create(this);
string running = connection.GetValueFromHash($"recurring-job:{recurringJobId}", "Running");
logger.Info($"running: {running}");
if ("yes".Equals(running, StringComparison.OrdinalIgnoreCase))
{
logger.Info($"Job will be canceled");
context.Canceled = true;
}
I don't know why is it happening, but it won't trigger from this moment... I am using SQLite file as DB.
@frozzen10 The issue you're experiencing might be because the "Running" status of the job is not being reset properly when the job fails or when it's in a final state. This could cause the job to be immediately canceled on the next execution because the system thinks it's still running. To fix this, you should ensure that the "Running" status is reset in all cases when the job is in a final state, not just when it's not in a FailedState.
in OnStateApplied
`
var recurringJobId = SerializationHelper.Deserialize(
context.Connection.GetJobParameter(context.BackgroundJob.Id, "RecurringJobId"));
if (string.IsNullOrWhiteSpace(recurringJobId)) return;
if (context.NewState is EnqueuedState)
{
transaction.SetRangeInHash(
$"recurring-job:{recurringJobId}",
new[] {new KeyValuePair<string, string>("Running", "yes")});
}
else if (context.NewState.IsFinal)
{
transaction.SetRangeInHash(
$"recurring-job:{recurringJobId}",
new[] {new KeyValuePair<string, string>("Running", "no")});
}
`
If you have this issue here is the solution for it.
After investigation, I found out that our customer SkipWhenPreviousJobIsRunningAttribute
will trigger
OnStateApplied
method after the job has been deleted.
The reason why it is doing that is this scenario :
The job will be triggered for running, during the running time and still not completed we delete the job!
On time the job is completed method OnStateApplied will be triggered even if the job is deleted and it will create a new row in the table hash.
If we want to keep the custom attribute SkipWhenPreviousJobIsRunningAttribute
Before adding the new row to hash
table check if the job is deleted or not
var job = JobStorage.Current.GetConnection().GetRecurringJobs(new[] { recurringJobId }).FirstOrDefault();
if(job is { Removed: true}) return;
transaction.SetRangeInHash(
$"recurring-job:{recurringJobId}",
new[] { new KeyValuePair<string, string>(RunningKey, "no") });
The benefit of this approach is that it will save us of creating unneeded data in the DB.
Unfortunately, it will increase the load on the DB since it is one more additional request.
Thanks for handling this. I have updated the gist with new methods available in Hangfire 1.8+ to avoid any race conditions. They work by acquiring a lock in the same transaction, and checks existence of a recurring job first. So now should be no troubles running this script even if everything is going wrong.
Thank you a lot, saved me against PostgreSqlDistributedLockException
@frozzen10 The issue you're experiencing might be because the "Running" status of the job is not being reset properly when the job fails or when it's in a final state. This could cause the job to be immediately canceled on the next execution because the system thinks it's still running. To fix this, you should ensure that the "Running" status is reset in all cases when the job is in a final state, not just when it's not in a FailedState.
in OnStateApplied ` var recurringJobId = SerializationHelper.Deserialize( context.Connection.GetJobParameter(context.BackgroundJob.Id, "RecurringJobId"));
if (string.IsNullOrWhiteSpace(recurringJobId)) return;
if (context.NewState is EnqueuedState) { transaction.SetRangeInHash(
$"recurring-job:{recurringJobId}", new[] {new KeyValuePair<string, string>("Running", "yes")}); } else if (context.NewState.IsFinal) { transaction.SetRangeInHash( $ "recurring-job:{recurringJobId}", new[] {new KeyValuePair<string, string>("Running", "no")}); } `
Watch out, "Failed state" is not a final state, if you disabled automaticRetry with no delete you'll never have a "DeletedState".
According to your case, please add
context.NewState.IsFinal || context.NewState is FailedState
Is there a new fixed version for this attribute? Why is it not part of Hangfire?
This filter is incredibly helpful, especially for recurring jobs prone to transient errors. We've implemented it for our data import jobs, which poll a somewhat unstable service every 5 minutes.
Previously, we tried:
DisableConcurrentExecutionAttribute
, which worked as expected to prevent concurrent executions.AutomaticRetryAttribute(0, Fail)
, but this wasn't ideal because we rely on monitoring failed jobs after a set number of retries.- Allowing the 5-minute jobs to flood the queue and retry for up to half an hour, which overloaded the target service.
This filter solved our problem perfectly. It prevents new job triggers based on the interval while a previous instance is still running, even if it's failing multiple times.
It would be fantastic to see this included in the core package!
Jokes apart, I can't pick a meaningful short name for this filter that defines its behavior and tells us it works only with recurring jobs. This is now the only thing that prevents from including it to Hangfire.Core 🤦♂️.
@odinserj SerializedRecurringJobAttribute which would actually be used as just [SerializedRecurringJob]
in the code using that attribute. The same term and concept are being used for Serializable transaction isolation levels in database management systems and it pretty much refers to the analogous context. See https://en.wikipedia.org/wiki/Isolation_(database_systems)#Serializable where it states:
A serial execution is one in which each SQL-transaction executes to completion before the next SQL-transaction begins.
@odinserj, if i may recommend: SingleInstanceRecurringJobAttribute
Am I right, that this attribute would not prevent a recurring job to be run when the job was already manually triggered by a user?
In my case I've had a problem when a concurrent job was stuck in kind of "loop" (so sometimes the job was stuck and didn't run at all for hours,days..), that was because I was making a lot "Task.Run(...)" in other services which are not related to Hangfire, so I simply moved most of my Task.Run to System.Threading.Channels.Channel
and all works well now
It's works perfect. Many thanks.
But it not work if you place the attribute in a interface.
This is OK
[SkipWhenPreviousJobIsRunning]
public class JobWithITaskDelay90Sec : ITask<bool>
This not working.
public class JobWithITaskDelay90Sec : ITask<bool>
{
}
[SkipWhenPreviousJobIsRunning]
public interface ITask<TResult>
{
}
Thank you for providing this code. It perfectly matches the functionality I was looking for.
Unfortunately, it doesn't work for me. Initially, it seemed to work fine on my local setup with a single Hangfire instance, although I didn't test it for very long.
Now that I've deployed it to four test instances, I'm seeing multiple instances of the same RecurringJob running simultaneously. I created a dummy job with Task.Delay for 10 minutes, which starts every minute. It ran overnight, and the test instances were restarted by IIS at some point during the night. Now, I have multiple instances of this job running concurrently.
I'm curious about how this differs from Hangfire Ace's concurrency and throttling via mutex?
Thanks for sharing!
Please update lines with deprecated methods:
https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e#file-skipwhenpreviousjobisrunningattribute-cs-L52
https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e#file-skipwhenpreviousjobisrunningattribute-cs-L43
To new ones:
var recurringJobId = SerializationHelper.Deserialize(context.Connection.GetJobParameter(context.BackgroundJob.Id, "RecurringJobId"));