|
using System; |
|
using System.Collections.Generic; |
|
using System.Data.SqlClient; |
|
using System.Reflection; |
|
using System.Threading; |
|
using Dapper; |
|
using Hangfire; |
|
using Hangfire.Annotations; |
|
using Hangfire.Logging; |
|
using Hangfire.Server; |
|
using Hangfire.SqlServer; |
|
|
|
namespace MyApp |
|
{ |
|
public class FixedHangfireSqlStorage : SqlServerStorage |
|
{ |
|
private readonly SqlServerStorageOptions _options; |
|
|
|
public FixedHangfireSqlStorage(string nameOrConnectionString, SqlServerStorageOptions options) : base(nameOrConnectionString, options) |
|
{ |
|
_options = options; |
|
} |
|
|
|
public override IEnumerable<IServerComponent> GetComponents() |
|
{ |
|
var a = Assembly.GetAssembly(typeof(SqlServerStorage)) |
|
.GetType("Hangfire.SqlServer.ExpirationManager") |
|
.GetConstructor(BindingFlags.Public | BindingFlags.Instance, null, CallingConventions.HasThis, new Type[] {typeof (SqlServerStorage), typeof (TimeSpan)}, null) |
|
.Invoke(new object[]{this, _options.JobExpirationCheckInterval}); |
|
yield return a as IServerComponent; |
|
yield return new FixedCountersAggregator(this, _options.CountersAggregateInterval); |
|
} |
|
} |
|
|
|
internal class FixedCountersAggregator : IServerComponent |
|
{ |
|
private static readonly ILog Logger = LogProvider.GetCurrentClassLogger(); |
|
|
|
private const int NumberOfRecordsInSinglePass = 1000; |
|
private static readonly TimeSpan DelayBetweenPasses = TimeSpan.FromMilliseconds(500); |
|
|
|
private readonly SqlServerStorage _storage; |
|
private readonly TimeSpan _interval; |
|
|
|
public FixedCountersAggregator(SqlServerStorage storage, TimeSpan interval) |
|
{ |
|
if (storage == null) throw new ArgumentNullException("storage"); |
|
|
|
_storage = storage; |
|
_interval = interval; |
|
} |
|
|
|
public void Execute(CancellationToken cancellationToken) |
|
{ |
|
Logger.DebugFormat("Aggregating records in 'Counter' table..."); |
|
|
|
int removedCount; |
|
|
|
do |
|
{ |
|
using (var storageConnection = _storage.GetConnection()) |
|
{ |
|
var query = GetAggregationQuery(); |
|
SqlConnection connection = (SqlConnection) storageConnection.GetType().GetProperty("Connection").GetValue(storageConnection); |
|
removedCount = connection.Execute( |
|
query, |
|
new { now = DateTime.UtcNow, count = NumberOfRecordsInSinglePass }); |
|
} |
|
if (removedCount >= NumberOfRecordsInSinglePass) |
|
{ |
|
cancellationToken.WaitHandle.WaitOne(DelayBetweenPasses); |
|
cancellationToken.ThrowIfCancellationRequested(); |
|
} |
|
} while (removedCount >= NumberOfRecordsInSinglePass); |
|
|
|
cancellationToken.WaitHandle.WaitOne(_interval); |
|
} |
|
|
|
public override string ToString() |
|
{ |
|
return "SQL Counter Table Aggregator"; |
|
} |
|
|
|
private static string GetAggregationQuery() |
|
{ |
|
return @" |
|
DECLARE @RecordsToAggregate TABLE |
|
( |
|
[Key] NVARCHAR(100) NOT NULL, |
|
[Value] SMALLINT NOT NULL, |
|
[ExpireAt] DATETIME NULL |
|
) |
|
SET TRANSACTION ISOLATION LEVEL READ COMMITTED |
|
BEGIN TRAN |
|
DELETE TOP (@count) [Hangfire].[Counter] with (readpast) |
|
OUTPUT DELETED.[Key], DELETED.[Value], DELETED.[ExpireAt] INTO @RecordsToAggregate |
|
SET NOCOUNT ON; |
|
MERGE [Hangfire].[AggregatedCounter] AS [Target] |
|
USING ( |
|
SELECT [Key], SUM([Value]) as [Value], MAX([ExpireAt]) AS [ExpireAt] FROM @RecordsToAggregate |
|
GROUP BY [Key]) AS [Source] ([Key], [Value], [ExpireAt]) |
|
ON [Target].[Key] = [Source].[Key] |
|
WHEN MATCHED THEN UPDATE SET |
|
[Target].[Value] = [Target].[Value] + [Source].[Value], |
|
[Target].[ExpireAt] = (SELECT MAX([ExpireAt]) FROM (VALUES ([Source].ExpireAt), ([Target].[ExpireAt])) AS MaxExpireAt([ExpireAt])) |
|
WHEN NOT MATCHED THEN INSERT ([Key], [Value], [ExpireAt]) VALUES ([Source].[Key], [Source].[Value], [Source].[ExpireAt]); |
|
COMMIT TRAN"; |
|
} |
|
} |
|
|
|
public static class FixedSqlServerStorageExtensions |
|
{ |
|
public static IGlobalConfiguration<SqlServerStorage> UseFixedSqlServerStorage( |
|
[NotNull] this IGlobalConfiguration configuration, |
|
[NotNull] string nameOrConnectionString, |
|
[NotNull] SqlServerStorageOptions options) |
|
{ |
|
if (configuration == null) throw new ArgumentNullException("configuration"); |
|
if (nameOrConnectionString == null) throw new ArgumentNullException("nameOrConnectionString"); |
|
if (options == null) throw new ArgumentNullException("options"); |
|
|
|
var storage = new FixedHangfireSqlStorage(nameOrConnectionString, options); |
|
return configuration.UseStorage(storage); |
|
} |
|
} |
|
} |