-
-
Save ahjohannessen/b8c7aff495a8207591fd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.CodeDom; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Net; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using EventStore.ClientAPI; | |
using EventStore.ClientAPI.Embedded; | |
using EventStore.ClientAPI.SystemData; | |
using EventStore.Core; | |
using EventStore.Core.Bus; | |
using EventStore.Core.Messages; | |
namespace ReplicaSpike | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var port = int.Parse(args[0]); | |
var otherport = int.Parse(args[1]); | |
var node = StartEmbeddedEventStore(port); | |
Console.WriteLine("Node up. waiting"); | |
Thread.Sleep(10000); | |
Console.WriteLine("Setting up thread to write later."); | |
Task.Factory.StartNew(() => WriteEvents(node, port)); | |
Console.WriteLine("Setting up replication"); | |
ReplicateFrom(node, otherport); | |
Console.ReadLine(); | |
} | |
private static void WriteEvents(ClusterVNode node, int myPort) | |
{ | |
using (var conn = EmbeddedEventStoreConnection.Create(node, ConnectionSettings.Create().LimitRetriesForOperationTo(3))) | |
{ | |
for(int i=0;i<500;i++) | |
{ | |
Thread.Sleep(1000); | |
Console.WriteLine("writing event to me"); | |
conn.AppendToStreamAsync("foo-" + myPort, ExpectedVersion.Any, BuildEvent(myPort)).Wait(); | |
} | |
} | |
} | |
private static EventData BuildEvent(int port) | |
{ | |
return new EventData(Guid.NewGuid(), "foo", true, new byte[500], Encoding.UTF8.GetBytes(port.ToString())); | |
} | |
private static void ReplicateFrom(ClusterVNode node, int otherport) | |
{ | |
Console.WriteLine("Attempting to connect"); | |
//load up initial checksum from stream or even local checkpoint (local checkpoint may be better) | |
using (var localConnection = EmbeddedEventStoreConnection.Create(node, "local connection")) | |
{ | |
using (var connection = EventStoreConnection.Create(ConnectionSettings.Create() | |
.KeepReconnecting() | |
.KeepRetrying() | |
.Build(), new IPEndPoint(IPAddress.Loopback, otherport))) | |
{ | |
connection.ConnectAsync().Wait(); | |
Console.WriteLine("Successfully conencted to localhost:{0}", otherport); | |
Console.WriteLine("Setting up subscription"); | |
var sub = connection.SubscribeToAllFrom(null, false, | |
(s, ev) => | |
{ | |
var port = TryReadPortFromMetadata(ev.OriginalEvent.Metadata); | |
if (port == otherport) | |
{ | |
Console.WriteLine("writing event {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber); | |
localConnection.AppendToStreamAsync(ev.OriginalStreamId, ev.OriginalEventNumber -1, | |
new UserCredentials("admin", "changeit"), | |
new EventData(ev.OriginalEvent.EventId, | |
ev.OriginalEvent.EventType, | |
ev.OriginalEvent.IsJson, | |
ev.OriginalEvent.Data, | |
ev.OriginalEvent.Metadata)).Wait(); | |
} | |
else | |
{ | |
Console.WriteLine("Not writing {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber); | |
} | |
}, | |
(s) => Console.WriteLine("live started`"), | |
(s, e, ex) => | |
{ | |
Console.WriteLine("sub dropped {0} {1}", e, ex); | |
}, | |
new UserCredentials("admin", "changeit")); | |
Console.WriteLine("waiting."); | |
Console.ReadLine(); | |
} | |
} | |
} | |
private static int TryReadPortFromMetadata(byte[] metadata) | |
{ | |
if (metadata == null) return 0; | |
try | |
{ | |
var str = Encoding.UTF8.GetString(metadata); | |
return int.Parse(str); | |
} | |
catch (Exception ex) | |
{ | |
return 0; | |
} | |
} | |
static ClusterVNode StartEmbeddedEventStore(int port) | |
{ | |
var embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode() | |
.RunInMemory() | |
.RunProjections(ProjectionsMode.None) | |
.WithExternalTcpOn(new IPEndPoint(IPAddress.Loopback, port)) | |
.WithInternalTcpOn(new IPEndPoint(IPAddress.None, 1234)) | |
.WithInternalHttpOn(new IPEndPoint(IPAddress.None, 1234)) | |
.WithExternalHttpOn(new IPEndPoint(IPAddress.None, 1234)) | |
.Build(); | |
var startedEvent = new ManualResetEventSlim(false); | |
embeddedEventStore.MainBus.Subscribe( | |
new AdHocHandler<UserManagementMessage.UserManagementServiceInitialized>(m => startedEvent.Set())); | |
embeddedEventStore.Start(); | |
Console.WriteLine("Waiting on node."); | |
if (!startedEvent.Wait(60000)) | |
throw new TimeoutException("Embedded Event Store has not started in 60 seconds."); | |
return embeddedEventStore; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment