Created
September 11, 2015 08:16
-
-
Save LeeCampbell/8c38d954229e06ee4f96 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using EventStore.ClientAPI; | |
using EventStore.ClientAPI.Embedded; | |
using EventStore.Core.Bus; | |
using EventStore.Core.Messages; | |
using HdrHistogram.NET; | |
using Newtonsoft.Json; | |
using Notis.Server.Dto; | |
using NUnit.Framework; | |
namespace Notis.Server.Tests | |
{ | |
[TestFixture] | |
public class EventStoreWritePerfTestFuxture | |
{ | |
[Test] | |
public async void ExecuteSerializedWrites() | |
{ | |
var histogram = GetHistogram(); | |
var events = GenerateEvents().ToList(); | |
var messages = events.Count; | |
var bytesToWrite = events.Sum(evt => (long)(evt.EventData.Data.Length)); | |
TimeSpan elapsed; | |
using (var conn = await Connect()) | |
{ | |
//Warm up. | |
foreach (var evt in events.Take(100)) | |
{ | |
await conn.AppendToStreamAsync(evt.StreamName, evt.ExpectedVersion, evt.EventData); | |
} | |
var timer = Stopwatch.StartNew(); | |
foreach (var evt in events) | |
{ | |
using (histogram.MeasureLatency()) | |
{ | |
await conn.AppendToStreamAsync(evt.StreamName, evt.ExpectedVersion, evt.EventData); | |
} | |
} | |
timer.Stop(); | |
elapsed = timer.Elapsed; | |
} | |
var msgPerSec = messages / elapsed.TotalSeconds; | |
var megabitsWritten = ((bytesToWrite * 8) / (1024 * 1024)); | |
var mbsPerSec = megabitsWritten / elapsed.TotalSeconds; | |
Console.WriteLine($"{messages:#,###} events written in {elapsed}"); | |
Console.WriteLine($" {msgPerSec:#,###.00} msg/sec"); | |
Console.WriteLine($" {mbsPerSec:#,###.00} mbps"); | |
Console.WriteLine(); | |
Console.WriteLine("--Latency measurements--"); | |
var writer = new StringWriter(); | |
histogram.outputPercentileDistribution(writer); | |
Console.WriteLine(writer.ToString()); | |
} | |
private static IEnumerable<Message> GenerateEvents() | |
{ | |
for (int i = 0; i < 100 * 1000; i++) | |
{ | |
var payload = CreateStubCommand().ToBson(); | |
var evt = new EventData(Guid.NewGuid(), "DealCreated", true, payload, null); | |
yield return new Message($"Deal-{evt.EventId}", ExpectedVersion.NoStream, evt); | |
} | |
} | |
private static CreateDealCommand CreateStubCommand() | |
{ | |
return new CreateDealCommand | |
{ | |
TradeType = TradeType.Physical, | |
PricingType = PricingType.Fixed, | |
TradeDate = DateTimeOffset.Now, | |
Product = new ReferenceEntity("123", "Soybean oil"), | |
Direction = TradeDirection.TraderBuysClientSells, | |
QuantityMeasurment = new QuantityMeasurment(10000, UnitOfMeasure.MetricTon), | |
CounterpartyInfo = new CounterPartyInfo | |
{ | |
Counterparty = new ReferenceEntity("123", "CommodityPurchaser"), | |
MasterServicesAgreement = new ReferenceEntity("MSA-123", "Standard MSA") | |
}, | |
Shipment = new TradeShipmentTerms | |
{ | |
ShipmentFromDate = new DateTime(2015, 12, 15), | |
ShipmentToDate = new DateTime(2016, 01, 15), | |
Location = new ReferenceEntity("NL", "Netherlands"), | |
IncoTerms = new ReferenceEntity("FOB", "Free on board") | |
}, | |
Tolerances = new TradeTolerances | |
{ | |
MinTolerance = 0.01m, | |
MaxTolerance = 0.05m, | |
ToleranceMethod = ValueVariationMethod.Percentage, | |
Option = ToleranceOption.Buyer, | |
}, | |
TraderPricing = new PricingParameters | |
{ | |
PriceCurrency = "USD", | |
ContractPremium = new ValueAdjustment(2.0m, ValueVariationMethod.Percentage), | |
TradePricingOption = TraderPricingOption.FixedPrice | |
}, | |
Payment = new PaymentDetails | |
{ | |
PaymentCurrency = "USD", | |
PaymentTerms = new ReferenceEntity("CASH", "Cash") | |
} | |
}; | |
} | |
private static Histogram GetHistogram() | |
{ | |
return new Histogram((long)(1000000 * 60 * 30), 3); // 1 ns to 30 minutes | |
} | |
private static async Task<IEventStoreConnection> Connect() | |
{ | |
var ipAddress = new IPAddress(new byte[] { 127, 0, 0, 1 }); | |
var defaulEndpoint = new IPEndPoint(ipAddress, 1113); | |
StartEventStore(); | |
var connectionSettings = ConnectionSettings.Create() | |
.KeepReconnecting() | |
.UseDebugLogger(); | |
var conn = EventStoreConnection.Create(connectionSettings, defaulEndpoint); | |
await conn.ConnectAsync(); | |
return conn; | |
} | |
private static void StartEventStore() | |
{ | |
var timeout = TimeSpan.FromSeconds(3); | |
var clusterVNode = EmbeddedVNodeBuilder.AsSingleNode() | |
//.RunInMemory() | |
.RunProjections(ProjectionsMode.None) | |
//.EnableDevelopmentMode() | |
.OnDefaultEndpoints() | |
.Build(); | |
var startedEvent = new ManualResetEventSlim(false); | |
clusterVNode.MainBus.Subscribe(new AdHocHandler<SystemMessage.SystemStart>(m => startedEvent.Set())); | |
clusterVNode.Start(); | |
if (!startedEvent.Wait(timeout)) | |
throw new TimeoutException($"EventStore haven't started in {timeout} seconds."); | |
} | |
private sealed class Message | |
{ | |
public Message(string streamName, int expectedVersion, EventData eventData) | |
{ | |
StreamName = streamName; | |
ExpectedVersion = expectedVersion; | |
EventData = eventData; | |
} | |
public string StreamName { get; } | |
public int ExpectedVersion { get; } | |
public EventData EventData { get; } | |
} | |
} | |
public static class HistogramExtensions | |
{ | |
public static IDisposable MeasureLatency(this Histogram histogram) | |
{ | |
var startTs = Stopwatch.GetTimestamp(); | |
return new ActionDisposable( | |
() => | |
{ | |
histogram.recordValue(Stopwatch.GetTimestamp() - startTs); | |
}); | |
} | |
private sealed class ActionDisposable : IDisposable | |
{ | |
private readonly Action _action; | |
public ActionDisposable(Action action) | |
{ | |
_action = action; | |
} | |
public void Dispose() | |
{ | |
_action(); | |
} | |
} | |
} | |
public static class SerializerEx | |
{ | |
public static byte[] ToBson<T>(this T input) | |
{ | |
var json = Serialize(input); | |
return Encoding.UTF8.GetBytes(json); | |
} | |
public static string Serialize<T>(this T sut) | |
{ | |
var json = JsonConvert.SerializeObject( | |
sut, | |
Formatting.Indented, | |
new JsonSerializerSettings | |
{ | |
NullValueHandling = NullValueHandling.Ignore, | |
DefaultValueHandling = DefaultValueHandling.Ignore, | |
}); | |
return json; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment