Last active
October 23, 2019 07:36
-
-
Save aarondandy/ac70eace9c978098a5955eacd5f95e45 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// reference SqlStreamStore.Postgres, Newtonsoft.Json | |
// use C# 7.3+ | |
using System; | |
using System.Threading.Tasks; | |
using Newtonsoft.Json; | |
using Newtonsoft.Json.Linq; | |
using SqlStreamStore; | |
using SqlStreamStore.Streams; | |
namespace Mayonnaise | |
{ | |
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
// SQLStreamStore sits on top of an RDBMS so we need a connection string | |
// Just your usual connection string should do the trick here | |
// I put my secret password of "dev" right in the connection string and you too can live on the edge like me | |
var yourAverageEverydayDatabaseConnectionString | |
= "Host=localhost;Database=ssssandbox;Username=postgres;Password=dev"; // π | |
// In this example I choose to use postgres because I'm an interesting person. | |
// We need to deal with the actual type for a little bit for setting things up. | |
var implementationSpecificStreamStore = new PostgresStreamStore( | |
new PostgresStreamStoreSettings(yourAverageEverydayDatabaseConnectionString)); | |
// This could be the first time this has ever been run, so we need to set the DB schema up. | |
// If this was the first time, it will create the tables and other needed bits. | |
await implementationSpecificStreamStore.CreateSchema(); | |
// I keep the variable typed as an interface however because most _real_ usages will likely be injected this way. | |
IStreamStore streamStore = implementationSpecificStreamStore; | |
// Our domain is crafting duck mayonnaise in a mayonnaise machine, as seen in Stardew Valley: | |
// See: https://stardewvalleywiki.com/Duck_Mayonnaise | |
// Lets drop an egg into the machine and see what comes out! | |
// We are going to have the machine be our aggregate and have the stream ID match, because I'm lazy. | |
var machineId = Guid.NewGuid(); | |
var streamId = new StreamId(machineId.ToString("N")); | |
// Lets build our first mesage to indicate putting a duck egg into the mayonnaise machine | |
var insertItemMessage = new NewStreamMessage(Guid.NewGuid(), "item-inserted", JsonConvert.SerializeObject(new | |
{ | |
itemName = "Duck Egg" | |
})); | |
// Because we just made the stream ID up, this will be the first message on the stream | |
var currentVersion = ExpectedVersion.NoStream; | |
// This will add the new message about our egg to the machine's stream | |
var appendResult = await streamStore.AppendToStream(streamId, currentVersion, insertItemMessage); | |
// Appending a message increments the stream's version | |
currentVersion = appendResult.CurrentVersion; | |
// It takes three hours for a duck egg to convert into duck mayonnaise | |
// Lets pass some time, and then some, with events | |
for (var i = 0; i < 5; i++) | |
{ | |
var workingMessage = new NewStreamMessage(Guid.NewGuid(), "passed-some-time", JsonConvert.SerializeObject(new | |
{ | |
hours = 1 | |
})); | |
appendResult = await streamStore.AppendToStream(streamId, currentVersion, workingMessage); | |
currentVersion = appendResult.CurrentVersion; | |
} | |
// When the operator of the machinery notices the product is ready, they remove it | |
// I don't have any idea what this message should look like, have an emoji | |
var extractItemMessage = new NewStreamMessage(Guid.NewGuid(), "extract-item", "\"π\""); | |
appendResult = await streamStore.AppendToStream(streamId, currentVersion, extractItemMessage); | |
currentVersion = appendResult.CurrentVersion; | |
// π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯ | |
// | |
// Now, lets play back what happened to recreate this culinary abomination. | |
// | |
// π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯π¦π₯ | |
// We are going to build a projection from the events. | |
// First, we need some state, I'm going to use some local variables | |
string machineContents = null; | |
var machineItemHoursWorked = 0; | |
// Next, lets define some handlers, I'm using local methods for brevity | |
// when we encounter an item inserted event, reset the state of the machine for it | |
void insertItem(string itemName) | |
{ | |
Console.WriteLine($"Inserted {itemName} into the machine."); | |
machineContents = itemName; | |
machineItemHoursWorked = 0; | |
} | |
// as time passes transformations may or may not take place | |
void passTime(int hours) | |
{ | |
Console.WriteLine($"Waited for {hours} hours."); | |
machineItemHoursWorked += hours; | |
// This is from our business rules. It takes 3 hours for an egg to become mayonnaise. | |
if (machineContents == "Duck Egg" && machineItemHoursWorked >= 3) | |
{ | |
Console.WriteLine($"Our item has transformed after waiting {machineItemHoursWorked} hours!"); | |
machineContents = "Duck Mayonnaise"; | |
machineItemHoursWorked -= 3; | |
} | |
} | |
// when the item is removed from the machine, we just reset the machine | |
void extractItem() | |
{ | |
Console.WriteLine($"Item {machineContents} was removed from the machine."); | |
machineContents = null; | |
machineItemHoursWorked = 0; | |
} | |
// Then, for the last step handle all of the messages in sequence | |
await forEachMessage(streamId, async m => | |
{ | |
switch (m.Type) | |
{ | |
case "item-inserted": | |
var itemName = JsonConvert.DeserializeObject<JObject>(await m.GetJsonData()).Value<string>("itemName"); | |
insertItem(itemName); | |
break; | |
case "passed-some-time": | |
var hours = JsonConvert.DeserializeObject<JObject>(await m.GetJsonData()).Value<int>("hours"); | |
passTime(hours); | |
break; | |
case "extract-item": | |
extractItem(); | |
break; | |
default: | |
throw new NotSupportedException(); | |
} | |
Console.WriteLine($"Machine status: {machineContents ?? "idle"} for {machineItemHoursWorked} hours."); | |
}); | |
// We no longer need the connection or access to the stream | |
streamStore.Dispose(); | |
Console.WriteLine("Press the Any Key to enjoy artisanal goods..."); | |
Console.ReadKey(); | |
// A terrible replacement for having IAsyncEnumerable πππ | |
async Task forEachMessage(string streamIdToRead, Func<StreamMessage, Task> processMessage) | |
{ | |
int pageStartVersion = 0; | |
ReadStreamPage page; | |
do | |
{ | |
page = await streamStore.ReadStreamForwards(streamIdToRead, pageStartVersion, maxCount: 2); | |
foreach (var message in page.Messages) | |
{ | |
await processMessage(message); | |
} | |
pageStartVersion = page.NextStreamVersion; | |
} | |
while (!page.IsEnd); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment