Created
April 9, 2021 13:09
-
-
Save bymyslf/157e4bb3851c03d712a84f72da9ed42d to your computer and use it in GitHub Desktop.
Redis Job Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
public interface IJobQueue | |
{ | |
Task Finish(string key, CancellationToken cancellationToken = default); | |
Task Requeue(string key, CancellationToken cancellationToken = default); | |
Task<IReadOnlyCollection<(string Key, string Value)>> PopAll(CancellationToken cancellationToken = default); | |
Task Enqueue(string value, CancellationToken cancellationToken = default); | |
Task<IDictionary<string, string>> GetAll(CancellationToken cancellationToken = default); | |
Task Clear(CancellationToken cancellationToken = default); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using StackExchange.Redis; | |
public class RedisConnection | |
{ | |
private readonly Lazy<ConnectionMultiplexer> connection; | |
public RedisConnection(string connectionString) | |
{ | |
if (string.IsNullOrEmpty(connectionString)) | |
{ | |
throw new ArgumentException(nameof(connectionString)); | |
} | |
var options = ConfigurationOptions.Parse(connectionString); | |
this.connection = new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(options)); | |
} | |
public IDatabase Database | |
=> this.connection.Value.GetDatabase(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using StackExchange.Redis; | |
public class RedisJobQueue : IJobQueue | |
{ | |
private readonly RedisConnection connection; | |
private readonly string failedQueue; | |
private readonly string processingQueue; | |
private IDatabase Database => connection.Database; | |
public RedisJobQueue(RedisConnection connection, string queueName) | |
{ | |
this.connection = connection; | |
this.failedQueue = $"{queueName}:failedtasks"; | |
this.processingQueue = $"{queueName}:processingtasks"; | |
} | |
/// <summary> | |
/// When a job is finished, remove it from the processing queue and from the cache database. | |
/// </summary> | |
/// <param name="key"></param> | |
public async Task Finish(string key, CancellationToken cancellationToken = default) | |
{ | |
cancellationToken.ThrowIfCancellationRequested(); | |
await Database.ListRemoveAsync(processingQueue, key); | |
cancellationToken.ThrowIfCancellationRequested(); | |
await Database.KeyDeleteAsync(key); | |
} | |
/// <summary> | |
/// When a job fails the execution, we want to add it to the failed queue and remove it from the processing queue. | |
/// </summary> | |
/// <param name="key"></param> | |
public async Task Requeue(string key, CancellationToken cancellationToken = default) | |
{ | |
cancellationToken.ThrowIfCancellationRequested(); | |
await Database.ListRemoveAsync(processingQueue, key); | |
cancellationToken.ThrowIfCancellationRequested(); | |
Database.HashDelete(key, "active"); | |
Database.HashIncrement(key, "failedcount"); | |
Database.ListRightPush(failedQueue, key); | |
} | |
/// <summary> | |
/// Move key from FailedQueue to ProcessingQueue, get key value from cache. | |
/// | |
/// Also set the active field. Indicates when job was retrieved so we can monitor its time. | |
/// </summary> | |
public async Task<IReadOnlyCollection<(string Key, string Value)>> PopAll(CancellationToken cancellationToken = default) | |
{ | |
var result = new List<(string, string)>(); | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
string key = await Database.ListRightPopLeftPushAsync(failedQueue, processingQueue); | |
if (string.IsNullOrEmpty(key)) | |
{ | |
break; | |
} | |
cancellationToken.ThrowIfCancellationRequested(); | |
var hashEntries = (await Database.HashGetAllAsync(key)); | |
await Database.HashSetAsync(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds); | |
foreach (var entry in hashEntries) | |
{ | |
cancellationToken.ThrowIfCancellationRequested(); | |
if (entry.Name.IsNullOrEmpty || entry.Value.IsNullOrEmpty) | |
{ | |
await Database.ListRemoveAsync(processingQueue, key); | |
continue; | |
} | |
result.Add((entry.Name, entry.Value)); | |
} | |
} | |
return result; | |
} | |
/// <summary> | |
/// Add a failed job to the queue | |
/// </summary> | |
/// <param name="value">Value to be inserted</param> | |
public async Task Enqueue(string value, CancellationToken cancellationToken = default) | |
{ | |
if (string.IsNullOrEmpty(value)) | |
{ | |
return; | |
} | |
cancellationToken.ThrowIfCancellationRequested(); | |
var id = await Database.StringIncrementAsync($"{failedQueue}:jobid"); | |
var key = $"{failedQueue}:{id}"; | |
cancellationToken.ThrowIfCancellationRequested(); | |
await Database.HashSetAsync(key, key, value); | |
cancellationToken.ThrowIfCancellationRequested(); | |
await Database.ListLeftPushAsync(failedQueue, key); | |
} | |
/// <summary> | |
/// Get all values in the failed queue | |
/// </summary> | |
public async Task<IDictionary<string, string>> GetAll(CancellationToken cancellationToken = default) | |
{ | |
var result = new Dictionary<string, string>(); | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
string key = await Database.ListRightPopLeftPushAsync(failedQueue, failedQueue); | |
if (string.IsNullOrEmpty(key) || result.ContainsKey(key)) | |
{ | |
break; | |
} | |
var entries = (await Database.HashGetAllAsync(key)); | |
foreach (var entry in entries) | |
{ | |
if (entry.Name.IsNullOrEmpty || entry.Value.IsNullOrEmpty) | |
{ | |
continue; | |
} | |
result.Add(entry.Name, entry.Value); | |
} | |
} | |
return result; | |
} | |
/// <summary> | |
/// Delete all values from queue | |
/// </summary> | |
public async Task Clear(CancellationToken cancellationToken = default) | |
{ | |
cancellationToken.ThrowIfCancellationRequested(); | |
await ClearQueue(processingQueue); | |
cancellationToken.ThrowIfCancellationRequested(); | |
await DeleteKey($"{failedQueue}:jobid"); | |
await ClearQueue(failedQueue); | |
async Task ClearQueue(string queueName) | |
{ | |
string redisValue = await Database.ListLeftPopAsync(queueName); | |
while (!string.IsNullOrEmpty(redisValue)) | |
{ | |
await Database.KeyDeleteAsync(redisValue); | |
await DeleteKey(redisValue); | |
redisValue = await Database.ListLeftPopAsync(queueName); | |
} | |
await DeleteKey(queueName); | |
} | |
async Task DeleteKey(string key) | |
{ | |
await Database.KeyDeleteAsync(key); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment