Last active
October 26, 2020 10:55
-
-
Save danielmarbach/bfb06208a74d42306c16 to your computer and use it in GitHub Desktop.
RavenMigrations Async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace ServiceControl.Migrations | |
{ | |
using System; | |
using System.Threading.Tasks; | |
using Raven.Abstractions; | |
using Raven.Abstractions.Data; | |
using ServiceBus.Management.Infrastructure.Settings; | |
using ServiceControl.Contracts.Operations; | |
using ServiceControl.MessageFailures; | |
[Migration(executionOrder: 201507011435)] | |
public class FailedMessageMigration : Migration | |
{ | |
readonly DateTime expiryThreshold; | |
public FailedMessageMigration() | |
: this(TimeSpan.FromHours(Settings.HoursToKeepMessagesBeforeExpiring)) | |
{ | |
} | |
public FailedMessageMigration(TimeSpan timeToKeepMessagesBeforeExpiring) | |
{ | |
expiryThreshold = SystemTime.UtcNow.Add(-timeToKeepMessagesBeforeExpiring); | |
} | |
protected override async Task UpAsyncInternal() | |
{ | |
await DocumentStore.AsyncDatabaseCommands.UpdateByIndex( | |
"Raven/DocumentsByEntityName", | |
new IndexQuery | |
{ | |
Query = "Tag:FailedMessages" | |
}, | |
new ScriptedPatchRequest() | |
{ | |
Script = @" | |
var historyConverter = function($doc) | |
{ | |
var attempts = []; | |
_($doc.ProcessingAttempts).forEach(function(attempt){ | |
attempts.push({ | |
'FailureDetails' : attempt.FailureDetails, | |
'CorrelationId' : attempt.CorrelationId, | |
'AttemptedAt' : attempt.AttemptedAt, | |
'MessageId' : attempt.MessageId, | |
'Headers' : attempt.Headers, | |
'ReplyToAddress' : attempt.ReplyToAddress, | |
'Recoverable' : attempt.Recoverable, | |
'MessageIntent' : attempt.MessageIntent, | |
'SendingEndpoint' : attempt.MessageMetadata['SendingEndpoint'], | |
'ProcessingEndpoint' : attempt.MessageMetadata['ReceivingEndpoint'], | |
'ContentType' : attempt.MessageMetadata['ContentType'], | |
'IsSystemMessage' : attempt.MessageMetadata['IsSystemMessage'], | |
'MessageType' : attempt.MessageMetadata['MessageType'], | |
'TimeSent' : attempt.MessageMetadata['TimeSent'] | |
}); | |
}); | |
PutDocument('MessageFailureHistories/' + $doc.UniqueMessageId, | |
{ | |
'Status' : $doc.Status, | |
'UniqueMessageId' : $doc.UniqueMessageId, | |
'ProcessingAttempts' : attempts | |
}, | |
{ | |
'Raven-Entity-Name' : 'MessageFailureHistories', | |
'Raven-Clr-Type' : 'ServiceControl.MessageFailures.MessageFailureHistory, ServiceControl' | |
} | |
); | |
}; | |
historyConverter(this); | |
var snapshotConverter = function($doc, $expiry) | |
{ | |
var lastAttempt = $doc.ProcessingAttempts[$doc.ProcessingAttempts.length - 1]; | |
if(Date.parse(lastAttempt.AttemptedAt) > Date.parse($expiry)) | |
{ | |
PutDocument('AuditMessageSnapshots/' + $doc.UniqueMessageId, | |
{ | |
'AttemptedAt' : lastAttempt.AttemptedAt, | |
'ProcessedAt' : lastAttempt.AttemptedAt, | |
'ConversationId' : lastAttempt.CorrelationId, | |
'IsSystemMessage' : lastAttempt.MessageMetadata['IsSystemMessage'], | |
'MessageType' : lastAttempt.MessageMetadata['MessageType'], | |
'Body' : { | |
'$type' : 'Particular.Backend.Debugging.BodyInformation, Particular.Backend.Debugging', | |
'BodyUrl' : lastAttempt.MessageMetadata['BodyUrl'], | |
'ContentType' : lastAttempt.MessageMetadata['ContentType'], | |
'ContentLength' : lastAttempt.MessageMetadata['ContentLength'], | |
'Text' : lastAttempt.MessageMetadata['Body'] | |
}, | |
'MessageIntent' : lastAttempt.MessageIntent, | |
'Processing' : { | |
'$type' : 'Particular.Backend.Debugging.ProcessingStatistics, Particular.Backend.Debugging', | |
'TimeSent' : lastAttempt.MessageMetadata['TimeSent'], | |
'CriticalTime' : lastAttempt.MessageMetadata['CriticalTime'], | |
'DeliveryTime' : lastAttempt.MessageMetadata['DeliveryTime'], | |
'ProcessingTime' : lastAttempt.MessageMetadata['ProcessingTime'] | |
}, | |
'ReceivingEndpoint' : lastAttempt.MessageMetadata['ReceivingEndpoint'], | |
'SendingEndpoint' : lastAttempt.MessageMetadata['SendingEndpoint'], | |
'HeadersForSearching' : lastAttempt.MessageMetadata['HeadersForSearching'], | |
'MessageId' : lastAttempt.MessageId, | |
'UniqueMessageId' : $doc.UniqueMessageId, | |
'Status' : 1, | |
'Headers' : lastAttempt.Headers | |
}, | |
{ | |
'Raven-Entity-Name' : 'AuditMessageSnapshot', | |
'Raven-Clr-Type' : ' Particular.Backend.Debugging.RavenDB.Model.MessageSnapshotDocument, Particular.Backend.Debugging.RavenDB' | |
} | |
); | |
} | |
}; | |
snapshotConverter(this, '" + expiryThreshold + "');" | |
} | |
, allowStale: true); | |
} | |
MessageStatus ConvertStatus(FailedMessageStatus status) | |
{ | |
switch (status) | |
{ | |
case FailedMessageStatus.Archived: | |
return MessageStatus.ArchivedFailure; | |
case FailedMessageStatus.Resolved: | |
return MessageStatus.ResolvedSuccessfully; | |
default: | |
return MessageStatus.Failed; | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
public class DefaultMigrationResolver : IMigrationResolver | |
{ | |
public Migration Resolve(Type migrationType) | |
{ | |
return (Migration)Activator.CreateInstance(migrationType); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
public interface IMigrationResolver | |
{ | |
Migration Resolve(Type migrationType); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Diagnostics; | |
using System.Threading.Tasks; | |
using Metrics; | |
using Raven.Client; | |
public abstract class Migration | |
{ | |
public virtual void Setup(IDocumentStore documentStore) | |
{ | |
DocumentStore = documentStore; | |
} | |
public async Task UpAsync() | |
{ | |
var stopWatch = new Stopwatch(); | |
var timer = Metric.Timer(GetType().FullName, Unit.Requests); | |
stopWatch.Start(); | |
await UpAsyncInternal(); | |
stopWatch.Stop(); | |
timer.Record(stopWatch.ElapsedMilliseconds, TimeUnit.Milliseconds); | |
} | |
protected abstract Task UpAsyncInternal(); | |
protected IDocumentStore DocumentStore { get; private set; } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
[AttributeUsage(AttributeTargets.Class, Inherited = false)] | |
public sealed class MigrationAttribute : Attribute | |
{ | |
public MigrationAttribute(long executionOrder) | |
{ | |
ExecutionOrder = executionOrder; | |
} | |
public long ExecutionOrder { get; set; } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
public class MigrationDocument | |
{ | |
public MigrationDocument() | |
{ | |
RunOn = DateTimeOffset.UtcNow; | |
} | |
public string Id { get; set; } | |
public DateTimeOffset RunOn { get; set; } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
using Raven.Abstractions.Data; | |
using Raven.Client; | |
static class MigrationExtensions | |
{ | |
/// <summary> | |
/// Will wait until the store has completed indexing. | |
/// </summary> | |
/// <remarks>Taken from Matt Warren's example here http://stackoverflow.com/q/10316721/2608 </remarks> | |
public static async Task WaitForIndexingAsync(this IDocumentStore store) | |
{ | |
DatabaseStatistics stats; | |
do | |
{ | |
stats = await store.AsyncDatabaseCommands.GetStatisticsAsync(); | |
await Task.Delay(10); | |
} while (stats.StaleIndexes.Length != 0); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Reflection; | |
public class MigrationOptions | |
{ | |
public MigrationOptions() | |
{ | |
Assemblies = new List<Assembly>(); | |
MigrationResolver = new DefaultMigrationResolver(); | |
} | |
public IList<Assembly> Assemblies { get; set; } | |
public IMigrationResolver MigrationResolver { get; set; } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
internal class MigrationWithAttribute | |
{ | |
public Func<Migration> Migration { get; set; } | |
public MigrationAttribute Attribute { get; set; } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
using System.Text.RegularExpressions; | |
public static class RavenMigrationHelpers | |
{ | |
public static readonly string RavenMigrationsIdPrefix = "Migrations"; | |
public static string GetMigrationIdFromName(this Migration migration, char seperator = '/') | |
{ | |
const char underscore = '_'; | |
var type = migration.GetType(); | |
var idSafeTypeName = Regex.Replace(type.Name, underscore + "{2,}", underscore.ToString()) | |
.Trim(underscore); | |
var name = idSafeTypeName | |
.Replace(underscore, seperator) | |
.ToLowerInvariant(); | |
var executionOrder = type.GetMigrationAttribute().ExecutionOrder; | |
return string.Join(seperator.ToString(), new[] { | |
RavenMigrationsIdPrefix, name, executionOrder.ToString() | |
}).ToLowerInvariant(); | |
} | |
public static MigrationAttribute GetMigrationAttribute(this Type type) | |
{ | |
var attribute = Attribute.GetCustomAttributes(type) | |
.FirstOrDefault(x => x.GetType().IsAssignableFrom(typeof(MigrationAttribute))); | |
return (MigrationAttribute)attribute; | |
} | |
public static IEnumerable<Type> GetLoadableTypes(this Assembly assembly) | |
{ | |
if (assembly == null) throw new ArgumentNullException("assembly"); | |
try | |
{ | |
return assembly.GetTypes(); | |
} | |
catch (ReflectionTypeLoadException e) | |
{ | |
return e.Types.Where(t => t != null); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
using System.Threading.Tasks; | |
using Raven.Client; | |
public class Runner | |
{ | |
public static async Task Run(IDocumentStore documentStore, MigrationOptions options = null) | |
{ | |
if (options == null) | |
options = new MigrationOptions(); | |
if (!options.Assemblies.Any()) | |
options.Assemblies.Add(Assembly.GetCallingAssembly()); | |
var migrations = FindAllMigrationsWithOptions(options); | |
foreach (var migrationsThatCanRunInParallel in migrations) | |
{ | |
var migrationTasks = migrationsThatCanRunInParallel.Select(migration1 => Task.Run(async () => | |
{ | |
var m = migration1.Migration(); | |
m.Setup(documentStore); | |
// todo: possible issue here with sharding | |
var migrationId = m.GetMigrationIdFromName(documentStore.Conventions.IdentityPartsSeparator[0]); | |
using (var session = documentStore.OpenAsyncSession()) | |
{ | |
var migrationDoc = await session.LoadAsync<MigrationDocument>(migrationId); | |
// we already ran it | |
if (migrationDoc != null) | |
{ | |
return; | |
} | |
await m.UpAsync(); | |
await session.StoreAsync(new MigrationDocument | |
{ | |
Id = migrationId | |
}); | |
await session.SaveChangesAsync(); | |
} | |
})).ToList(); | |
await Task.WhenAll(migrationTasks); | |
} | |
} | |
private static IEnumerable<List<MigrationWithAttribute>> FindAllMigrationsWithOptions(MigrationOptions options) | |
{ | |
var migrationsToRun = | |
from assembly in options.Assemblies | |
from t in assembly.GetLoadableTypes() | |
where typeof(Migration).IsAssignableFrom(t) | |
&& !t.IsAbstract | |
&& t.GetConstructor(Type.EmptyTypes) != null | |
select new MigrationWithAttribute | |
{ | |
Migration = () => options.MigrationResolver.Resolve(t), | |
Attribute = t.GetMigrationAttribute() | |
} into migration | |
orderby migration.Attribute.ExecutionOrder | |
group migration by migration.Attribute.ExecutionOrder into g | |
select g.ToList(); | |
return migrationsToRun; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment