Created
May 5, 2019 14:14
-
-
Save siennathesane/4c70b5ab2ca7085050996c7914306d78 to your computer and use it in GitHub Desktop.
RabbitMQ checkpoint code for Microsoft Orleans
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Text; | |
| using System.Threading.Tasks; | |
| using Microsoft.Extensions.Logging; | |
| using Orleans.Streams; | |
| namespace Orleans.RabbitMQ.Providers | |
| { | |
| /// <summary> | |
| /// Implements <see cref="IStreamQueueCheckpointerFactory"/> for RabbitMQ. It's important to note that the checkpoint is an always-moving | |
| /// per-stream global checkpoint. | |
| /// </summary> | |
| // todo (mxplusb): find a sane way to go through and have a duplicate checker fire once every few hours to keep the database usage low. | |
| public class RabbitMQCheckpointer : IStreamQueueCheckpointer<string> | |
| { | |
| private readonly RabbitMQCheckpointRecordDbContext _context; | |
| private readonly ILogger _logger; | |
| private string _streamProviderName; | |
| private string _partition; | |
| private string _serviceId; | |
| public static readonly TimeSpan DEFAULT_CHECKPOINT_PERSIST_INTERVAL = TimeSpan.FromMinutes(1); | |
| // get the highest checkpoint record id and if there are no checkpoints, set to zero. | |
| public bool CheckpointExists => _context.Records.Select(c => c.Id).DefaultIfEmpty(0).Max() != 0; | |
| /// <summary> | |
| /// Interval to write checkpoints. Prevents spamming the database. | |
| /// </summary> | |
| public TimeSpan PersistInterval { get; set; } = DEFAULT_CHECKPOINT_PERSIST_INTERVAL; | |
| public static IStreamQueueCheckpointer<string> Create(RabbitMQCheckpointRecordDbContext context, string streamProviderName, string partition, string serviceId, ILoggerFactory loggerFactory) | |
| { | |
| return new RabbitMQCheckpointer(context, streamProviderName, partition, serviceId, loggerFactory); | |
| } | |
| private RabbitMQCheckpointer(RabbitMQCheckpointRecordDbContext context, string streamProviderName, string partition, string serviceId, ILoggerFactory loggerFactory) | |
| { | |
| if (string.IsNullOrWhiteSpace(streamProviderName)) | |
| { | |
| throw new ArgumentNullException(nameof(streamProviderName)); | |
| } | |
| if (string.IsNullOrWhiteSpace(partition)) | |
| { | |
| throw new ArgumentNullException(nameof(partition)); | |
| } | |
| _logger = loggerFactory.CreateLogger<RabbitMQCheckpointer>(); | |
| _logger.LogInformation($"Creating RabbitMQ checkpointer for partition {partition} of stream provider {streamProviderName} with serviceId {serviceId}."); | |
| _context = context ?? throw new ArgumentNullException(nameof(context)); | |
| _streamProviderName = streamProviderName; | |
| _partition = partition; | |
| _serviceId = serviceId; | |
| } | |
| /// <summary> | |
| /// Loads the most recent checkpoint. | |
| /// </summary> | |
| /// <returns></returns> | |
| public Task<string> Load() | |
| { | |
| // todo (mxplusb): there's like an optimization here, but since it works on ever-increasing smaller scalar sets, it's unlikely. | |
| // still worth exploring if there's problems at very large scales (>1M checkpoints). I think it would depend if EF Core does | |
| // AOT compilation, each chained query is a new standalone query, or if the chains are operations on the results from the initial | |
| // query. I've not really dug into how it works with linq. | |
| // this grabs the latest checkpoint record's id based off known stream information. | |
| var currentCheckpoint = _context.Records | |
| .Where(c => c.StreamProviderName == _streamProviderName) | |
| .Where(c => c.Partition == _partition) | |
| .Where(c => c.ServiceId == _serviceId) | |
| .Select(c => c.Id).DefaultIfEmpty(0).Max(); | |
| if (currentCheckpoint == 0) | |
| { | |
| throw new KeyNotFoundException("Cannot find the most recent checkpoint. Have any checkpoints been written yet?"); | |
| } | |
| // now that we have the id of this this specific stream's checkpoint, we can grab the specific record. | |
| var checkpoint = _context.Records.Single(c => c.Id == currentCheckpoint); | |
| // hack (mxplusb): if it looks weird, but it works, it's not weird. | |
| return Task.Run(() => checkpoint.Offset); | |
| } | |
| /// <summary> | |
| /// Updates the checkpoint but ignores the utcNow parameter since we can't batch updates across streams. | |
| /// </summary> | |
| /// <param name="offset"></param> | |
| /// <param name="utcNow"></param> | |
| public void Update(string offset, DateTime utcNow) | |
| { | |
| var newCheckpoint = new RabbitMQCheckpointRecord() | |
| { | |
| Offset = offset, | |
| StreamProviderName = _streamProviderName, | |
| Partition = _partition, | |
| ServiceId = _serviceId, | |
| }; | |
| _context.Add(newCheckpoint); | |
| _context.SaveChanges(); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.ComponentModel.DataAnnotations; | |
| namespace Orleans.RabbitMQ.Providers | |
| { | |
| public class RabbitMQCheckpointRecord | |
| { | |
| [Key] | |
| public int Id; | |
| public string Offset; | |
| public string StreamProviderName; | |
| public string ServiceId; | |
| public string Partition; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.EntityFrameworkCore; | |
| namespace Orleans.RabbitMQ.Providers | |
| { | |
| public class RabbitMQCheckpointRecordDbContext : DbContext | |
| { | |
| public RabbitMQCheckpointRecordDbContext(DbContextOptions<RabbitMQCheckpointRecordDbContext> options) : base(options) { } | |
| public DbSet<RabbitMQCheckpointRecord> Records { get; set; } | |
| // since there will be lots of duplicate records, might as well index the key for performance reasons with >1M checkpoints. | |
| protected override void OnModelCreating(ModelBuilder modelBuilder) | |
| { | |
| modelBuilder.Entity<RabbitMQCheckpointRecord>().HasIndex(c => c.Id); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| namespace Orleans.Configuration | |
| { | |
| public class RabbitMQStreamCheckpointerOptions | |
| { | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment