Skip to content

Instantly share code, notes, and snippets.

@siennathesane
Created May 5, 2019 14:14
Show Gist options
  • Save siennathesane/4c70b5ab2ca7085050996c7914306d78 to your computer and use it in GitHub Desktop.
Save siennathesane/4c70b5ab2ca7085050996c7914306d78 to your computer and use it in GitHub Desktop.
RabbitMQ checkpoint code for Microsoft Orleans
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Streams;
namespace Orleans.RabbitMQ.Providers
{
/// <summary>
/// Implements <see cref="IStreamQueueCheckpointerFactory"/> for RabbitMQ. It's important to note that the checkpoint is an always-moving
/// per-stream global checkpoint.
/// </summary>
// todo (mxplusb): find a sane way to go through and have a duplicate checker fire once every few hours to keep the database usage low.
public class RabbitMQCheckpointer : IStreamQueueCheckpointer<string>
{
private readonly RabbitMQCheckpointRecordDbContext _context;
private readonly ILogger _logger;
private string _streamProviderName;
private string _partition;
private string _serviceId;
public static readonly TimeSpan DEFAULT_CHECKPOINT_PERSIST_INTERVAL = TimeSpan.FromMinutes(1);
// get the highest checkpoint record id and if there are no checkpoints, set to zero.
public bool CheckpointExists => _context.Records.Select(c => c.Id).DefaultIfEmpty(0).Max() != 0;
/// <summary>
/// Interval to write checkpoints. Prevents spamming the database.
/// </summary>
public TimeSpan PersistInterval { get; set; } = DEFAULT_CHECKPOINT_PERSIST_INTERVAL;
public static IStreamQueueCheckpointer<string> Create(RabbitMQCheckpointRecordDbContext context, string streamProviderName, string partition, string serviceId, ILoggerFactory loggerFactory)
{
return new RabbitMQCheckpointer(context, streamProviderName, partition, serviceId, loggerFactory);
}
private RabbitMQCheckpointer(RabbitMQCheckpointRecordDbContext context, string streamProviderName, string partition, string serviceId, ILoggerFactory loggerFactory)
{
if (string.IsNullOrWhiteSpace(streamProviderName))
{
throw new ArgumentNullException(nameof(streamProviderName));
}
if (string.IsNullOrWhiteSpace(partition))
{
throw new ArgumentNullException(nameof(partition));
}
_logger = loggerFactory.CreateLogger<RabbitMQCheckpointer>();
_logger.LogInformation($"Creating RabbitMQ checkpointer for partition {partition} of stream provider {streamProviderName} with serviceId {serviceId}.");
_context = context ?? throw new ArgumentNullException(nameof(context));
_streamProviderName = streamProviderName;
_partition = partition;
_serviceId = serviceId;
}
/// <summary>
/// Loads the most recent checkpoint.
/// </summary>
/// <returns></returns>
public Task<string> Load()
{
// todo (mxplusb): there's like an optimization here, but since it works on ever-increasing smaller scalar sets, it's unlikely.
// still worth exploring if there's problems at very large scales (>1M checkpoints). I think it would depend if EF Core does
// AOT compilation, each chained query is a new standalone query, or if the chains are operations on the results from the initial
// query. I've not really dug into how it works with linq.
// this grabs the latest checkpoint record's id based off known stream information.
var currentCheckpoint = _context.Records
.Where(c => c.StreamProviderName == _streamProviderName)
.Where(c => c.Partition == _partition)
.Where(c => c.ServiceId == _serviceId)
.Select(c => c.Id).DefaultIfEmpty(0).Max();
if (currentCheckpoint == 0)
{
throw new KeyNotFoundException("Cannot find the most recent checkpoint. Have any checkpoints been written yet?");
}
// now that we have the id of this this specific stream's checkpoint, we can grab the specific record.
var checkpoint = _context.Records.Single(c => c.Id == currentCheckpoint);
// hack (mxplusb): if it looks weird, but it works, it's not weird.
return Task.Run(() => checkpoint.Offset);
}
/// <summary>
/// Updates the checkpoint but ignores the utcNow parameter since we can't batch updates across streams.
/// </summary>
/// <param name="offset"></param>
/// <param name="utcNow"></param>
public void Update(string offset, DateTime utcNow)
{
var newCheckpoint = new RabbitMQCheckpointRecord()
{
Offset = offset,
StreamProviderName = _streamProviderName,
Partition = _partition,
ServiceId = _serviceId,
};
_context.Add(newCheckpoint);
_context.SaveChanges();
}
}
}
using System.ComponentModel.DataAnnotations;
namespace Orleans.RabbitMQ.Providers
{
public class RabbitMQCheckpointRecord
{
[Key]
public int Id;
public string Offset;
public string StreamProviderName;
public string ServiceId;
public string Partition;
}
}
using Microsoft.EntityFrameworkCore;
namespace Orleans.RabbitMQ.Providers
{
public class RabbitMQCheckpointRecordDbContext : DbContext
{
public RabbitMQCheckpointRecordDbContext(DbContextOptions<RabbitMQCheckpointRecordDbContext> options) : base(options) { }
public DbSet<RabbitMQCheckpointRecord> Records { get; set; }
// since there will be lots of duplicate records, might as well index the key for performance reasons with >1M checkpoints.
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<RabbitMQCheckpointRecord>().HasIndex(c => c.Id);
}
}
}
namespace Orleans.Configuration
{
public class RabbitMQStreamCheckpointerOptions
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment