Skip to content

Instantly share code, notes, and snippets.

@danielmarbach
Last active October 26, 2020 10:55
Show Gist options
  • Save danielmarbach/bfb06208a74d42306c16 to your computer and use it in GitHub Desktop.
Save danielmarbach/bfb06208a74d42306c16 to your computer and use it in GitHub Desktop.
RavenMigrations Async
namespace ServiceControl.Migrations
{
using System;
using System.Threading.Tasks;
using Raven.Abstractions;
using Raven.Abstractions.Data;
using ServiceBus.Management.Infrastructure.Settings;
using ServiceControl.Contracts.Operations;
using ServiceControl.MessageFailures;
[Migration(executionOrder: 201507011435)]
public class FailedMessageMigration : Migration
{
readonly DateTime expiryThreshold;
public FailedMessageMigration()
: this(TimeSpan.FromHours(Settings.HoursToKeepMessagesBeforeExpiring))
{
}
public FailedMessageMigration(TimeSpan timeToKeepMessagesBeforeExpiring)
{
expiryThreshold = SystemTime.UtcNow.Add(-timeToKeepMessagesBeforeExpiring);
}
protected override async Task UpAsyncInternal()
{
await DocumentStore.AsyncDatabaseCommands.UpdateByIndex(
"Raven/DocumentsByEntityName",
new IndexQuery
{
Query = "Tag:FailedMessages"
},
new ScriptedPatchRequest()
{
Script = @"
var historyConverter = function($doc)
{
var attempts = [];
_($doc.ProcessingAttempts).forEach(function(attempt){
attempts.push({
'FailureDetails' : attempt.FailureDetails,
'CorrelationId' : attempt.CorrelationId,
'AttemptedAt' : attempt.AttemptedAt,
'MessageId' : attempt.MessageId,
'Headers' : attempt.Headers,
'ReplyToAddress' : attempt.ReplyToAddress,
'Recoverable' : attempt.Recoverable,
'MessageIntent' : attempt.MessageIntent,
'SendingEndpoint' : attempt.MessageMetadata['SendingEndpoint'],
'ProcessingEndpoint' : attempt.MessageMetadata['ReceivingEndpoint'],
'ContentType' : attempt.MessageMetadata['ContentType'],
'IsSystemMessage' : attempt.MessageMetadata['IsSystemMessage'],
'MessageType' : attempt.MessageMetadata['MessageType'],
'TimeSent' : attempt.MessageMetadata['TimeSent']
});
});
PutDocument('MessageFailureHistories/' + $doc.UniqueMessageId,
{
'Status' : $doc.Status,
'UniqueMessageId' : $doc.UniqueMessageId,
'ProcessingAttempts' : attempts
},
{
'Raven-Entity-Name' : 'MessageFailureHistories',
'Raven-Clr-Type' : 'ServiceControl.MessageFailures.MessageFailureHistory, ServiceControl'
}
);
};
historyConverter(this);
var snapshotConverter = function($doc, $expiry)
{
var lastAttempt = $doc.ProcessingAttempts[$doc.ProcessingAttempts.length - 1];
if(Date.parse(lastAttempt.AttemptedAt) > Date.parse($expiry))
{
PutDocument('AuditMessageSnapshots/' + $doc.UniqueMessageId,
{
'AttemptedAt' : lastAttempt.AttemptedAt,
'ProcessedAt' : lastAttempt.AttemptedAt,
'ConversationId' : lastAttempt.CorrelationId,
'IsSystemMessage' : lastAttempt.MessageMetadata['IsSystemMessage'],
'MessageType' : lastAttempt.MessageMetadata['MessageType'],
'Body' : {
'$type' : 'Particular.Backend.Debugging.BodyInformation, Particular.Backend.Debugging',
'BodyUrl' : lastAttempt.MessageMetadata['BodyUrl'],
'ContentType' : lastAttempt.MessageMetadata['ContentType'],
'ContentLength' : lastAttempt.MessageMetadata['ContentLength'],
'Text' : lastAttempt.MessageMetadata['Body']
},
'MessageIntent' : lastAttempt.MessageIntent,
'Processing' : {
'$type' : 'Particular.Backend.Debugging.ProcessingStatistics, Particular.Backend.Debugging',
'TimeSent' : lastAttempt.MessageMetadata['TimeSent'],
'CriticalTime' : lastAttempt.MessageMetadata['CriticalTime'],
'DeliveryTime' : lastAttempt.MessageMetadata['DeliveryTime'],
'ProcessingTime' : lastAttempt.MessageMetadata['ProcessingTime']
},
'ReceivingEndpoint' : lastAttempt.MessageMetadata['ReceivingEndpoint'],
'SendingEndpoint' : lastAttempt.MessageMetadata['SendingEndpoint'],
'HeadersForSearching' : lastAttempt.MessageMetadata['HeadersForSearching'],
'MessageId' : lastAttempt.MessageId,
'UniqueMessageId' : $doc.UniqueMessageId,
'Status' : 1,
'Headers' : lastAttempt.Headers
},
{
'Raven-Entity-Name' : 'AuditMessageSnapshot',
'Raven-Clr-Type' : ' Particular.Backend.Debugging.RavenDB.Model.MessageSnapshotDocument, Particular.Backend.Debugging.RavenDB'
}
);
}
};
snapshotConverter(this, '" + expiryThreshold + "');"
}
, allowStale: true);
}
MessageStatus ConvertStatus(FailedMessageStatus status)
{
switch (status)
{
case FailedMessageStatus.Archived:
return MessageStatus.ArchivedFailure;
case FailedMessageStatus.Resolved:
return MessageStatus.ResolvedSuccessfully;
default:
return MessageStatus.Failed;
}
}
}
}
using System;
public class DefaultMigrationResolver : IMigrationResolver
{
public Migration Resolve(Type migrationType)
{
return (Migration)Activator.CreateInstance(migrationType);
}
}
using System;
public interface IMigrationResolver
{
Migration Resolve(Type migrationType);
}
using System.Diagnostics;
using System.Threading.Tasks;
using Metrics;
using Raven.Client;
public abstract class Migration
{
public virtual void Setup(IDocumentStore documentStore)
{
DocumentStore = documentStore;
}
public async Task UpAsync()
{
var stopWatch = new Stopwatch();
var timer = Metric.Timer(GetType().FullName, Unit.Requests);
stopWatch.Start();
await UpAsyncInternal();
stopWatch.Stop();
timer.Record(stopWatch.ElapsedMilliseconds, TimeUnit.Milliseconds);
}
protected abstract Task UpAsyncInternal();
protected IDocumentStore DocumentStore { get; private set; }
}
using System;
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public sealed class MigrationAttribute : Attribute
{
public MigrationAttribute(long executionOrder)
{
ExecutionOrder = executionOrder;
}
public long ExecutionOrder { get; set; }
}
using System;
public class MigrationDocument
{
public MigrationDocument()
{
RunOn = DateTimeOffset.UtcNow;
}
public string Id { get; set; }
public DateTimeOffset RunOn { get; set; }
}
using System.Threading.Tasks;
using Raven.Abstractions.Data;
using Raven.Client;
static class MigrationExtensions
{
/// <summary>
/// Will wait until the store has completed indexing.
/// </summary>
/// <remarks>Taken from Matt Warren's example here http://stackoverflow.com/q/10316721/2608 </remarks>
public static async Task WaitForIndexingAsync(this IDocumentStore store)
{
DatabaseStatistics stats;
do
{
stats = await store.AsyncDatabaseCommands.GetStatisticsAsync();
await Task.Delay(10);
} while (stats.StaleIndexes.Length != 0);
}
}
using System.Collections.Generic;
using System.Reflection;
public class MigrationOptions
{
public MigrationOptions()
{
Assemblies = new List<Assembly>();
MigrationResolver = new DefaultMigrationResolver();
}
public IList<Assembly> Assemblies { get; set; }
public IMigrationResolver MigrationResolver { get; set; }
}
using System;
internal class MigrationWithAttribute
{
public Func<Migration> Migration { get; set; }
public MigrationAttribute Attribute { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text.RegularExpressions;
public static class RavenMigrationHelpers
{
public static readonly string RavenMigrationsIdPrefix = "Migrations";
public static string GetMigrationIdFromName(this Migration migration, char seperator = '/')
{
const char underscore = '_';
var type = migration.GetType();
var idSafeTypeName = Regex.Replace(type.Name, underscore + "{2,}", underscore.ToString())
.Trim(underscore);
var name = idSafeTypeName
.Replace(underscore, seperator)
.ToLowerInvariant();
var executionOrder = type.GetMigrationAttribute().ExecutionOrder;
return string.Join(seperator.ToString(), new[] {
RavenMigrationsIdPrefix, name, executionOrder.ToString()
}).ToLowerInvariant();
}
public static MigrationAttribute GetMigrationAttribute(this Type type)
{
var attribute = Attribute.GetCustomAttributes(type)
.FirstOrDefault(x => x.GetType().IsAssignableFrom(typeof(MigrationAttribute)));
return (MigrationAttribute)attribute;
}
public static IEnumerable<Type> GetLoadableTypes(this Assembly assembly)
{
if (assembly == null) throw new ArgumentNullException("assembly");
try
{
return assembly.GetTypes();
}
catch (ReflectionTypeLoadException e)
{
return e.Types.Where(t => t != null);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Raven.Client;
public class Runner
{
public static async Task Run(IDocumentStore documentStore, MigrationOptions options = null)
{
if (options == null)
options = new MigrationOptions();
if (!options.Assemblies.Any())
options.Assemblies.Add(Assembly.GetCallingAssembly());
var migrations = FindAllMigrationsWithOptions(options);
foreach (var migrationsThatCanRunInParallel in migrations)
{
var migrationTasks = migrationsThatCanRunInParallel.Select(migration1 => Task.Run(async () =>
{
var m = migration1.Migration();
m.Setup(documentStore);
// todo: possible issue here with sharding
var migrationId = m.GetMigrationIdFromName(documentStore.Conventions.IdentityPartsSeparator[0]);
using (var session = documentStore.OpenAsyncSession())
{
var migrationDoc = await session.LoadAsync<MigrationDocument>(migrationId);
// we already ran it
if (migrationDoc != null)
{
return;
}
await m.UpAsync();
await session.StoreAsync(new MigrationDocument
{
Id = migrationId
});
await session.SaveChangesAsync();
}
})).ToList();
await Task.WhenAll(migrationTasks);
}
}
private static IEnumerable<List<MigrationWithAttribute>> FindAllMigrationsWithOptions(MigrationOptions options)
{
var migrationsToRun =
from assembly in options.Assemblies
from t in assembly.GetLoadableTypes()
where typeof(Migration).IsAssignableFrom(t)
&& !t.IsAbstract
&& t.GetConstructor(Type.EmptyTypes) != null
select new MigrationWithAttribute
{
Migration = () => options.MigrationResolver.Resolve(t),
Attribute = t.GetMigrationAttribute()
} into migration
orderby migration.Attribute.ExecutionOrder
group migration by migration.Attribute.ExecutionOrder into g
select g.ToList();
return migrationsToRun;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment