Created
April 5, 2011 04:11
-
-
Save joliver/903011 to your computer and use it in GitHub Desktop.
Blog--CQRS: Out of Sequence Messages and Read Models
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE [dbo].[Messages] | |
( | |
[AggregateId] [uniqueidentifier] NOT NULL, | |
[Version] [int] NOT NULL CHECK ([Version] > 0), | |
[Inserted] [datetime] NOT NULL DEFAULT (GETUTCDATE()), | |
[Headers] [varbinary](max) NOT NULL, | |
[Payload] [varbinary](max) NOT NULL, | |
CONSTRAINT [PK_Messages] PRIMARY KEY CLUSTERED ([AggregateId], [Version]) | |
); | |
CREATE TABLE [dbo].[Sequence] | |
( | |
[AggregateId] [uniqueidentifier] NOT NULL, | |
[MinVersion] [int] NOT NULL CHECK ([MinVersion] > 0), | |
[MaxVersion] [int] NOT NULL CHECK ([MaxVersion] > 0), | |
[Messages] [smallint] NOT NULL CHECK ([Messages] >= 0), | |
[Cleared] [datetime] NOT NULL DEFAULT (GETUTCDATE()), | |
CONSTRAINT [PK_Sequence] PRIMARY KEY CLUSTERED ([AggregateId]) | |
); | |
CREATE UNIQUE NONCLUSTERED INDEX [IX_Sequence] ON [dbo].[Sequence] | |
( [AggregateId], [MaxVersion], [Messages], [MinVersion], [Cleared] ); | |
ALTER TABLE [dbo].[Sequence] WITH CHECK ADD CONSTRAINT [CK_SequenceVersion] CHECK (([MaxVersion]>=[MinVersion])); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT | |
INTO [Sequence] | |
SELECT @AggregateId, 1, 1, 0, '2000-01-01' | |
WHERE NOT EXISTS ( SELECT * FROM [Sequence] WHERE [AggregateId] = @AggregateId ); | |
INSERT | |
INTO [Messages] | |
SELECT @AggregateId, @Version, GETUTCDATE(), @Headers, @Payload | |
WHERE NOT EXISTS ( SELECT * FROM [Messages] WHERE [AggregateId] = @AggregateId AND [Version] = @Version ) | |
AND EXISTS ( SELECT * FROM [Sequence] WHERE [AggregateId] = @AggregateId AND MinVersion <= @Version ); | |
UPDATE [Sequence] | |
SET [MaxVersion] = CASE WHEN [MaxVersion] > @Version THEN [MaxVersion] ELSE @Version END, | |
[Messages] = [Messages] + 1 | |
WHERE AggregateId = @AggregateId | |
AND @@ROWCOUNT > 0; /* the message insert succeeded */ | |
UPDATE [Sequence] | |
SET [MaxVersion] = [MaxVersion] + 1, | |
[MinVersion] = [MaxVersion] + 1, | |
[Messages] = 0, | |
[Cleared] = GETUTCDATE() | |
WHERE [AggregateId] = @AggregateId | |
AND [MaxVersion] = [MinVersion] + [Messages] - 1 | |
AND @@ROWCOUNT > 0; /* we successfully updated the sequence table because the message insert succeeded */ | |
SELECT [Headers], [Payload] | |
FROM [Messages] | |
WHERE [AggregateId] = @AggregateId | |
AND [Version] < ( SELECT [MinVersion] FROM Sequence WHERE [AggregateId] = @AggregateId ) | |
AND @@ROWCOUNT > 0 /* we're moving on to the next version */ | |
ORDER BY [Version]; | |
DELETE | |
FROM [Messages] | |
WHERE [AggregateId] = @AggregateId | |
AND [Version] < ( SELECT [MinVersion] FROM Sequence WHERE [AggregateId] = @AggregateId ) | |
AND @@ROWCOUNT > 0; /* remove the events that were selected */ | |
/* run messages through handlers here */ | |
/* if message handlers succeed, commit; otherwise DB is left in a consistent state and the message is retried */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment