Skip to content

Instantly share code, notes, and snippets.

@kstrauss
Created April 26, 2016 18:11
Show Gist options
  • Save kstrauss/9478fc76ed68a88480d0ef09de8f9e7d to your computer and use it in GitHub Desktop.
Save kstrauss/9478fc76ed68a88480d0ef09de8f9e7d to your computer and use it in GitHub Desktop.
Reproduction code for EventStore Issue #900
<Query Kind="Program">
<Connection>
<ID>e8afa357-aae4-40ae-a4ad-e4783f38f696</ID>
<Server>rdbtest2012</Server>
<Database>PackageMonitorReadModel_SGSpike-Main</Database>
<ShowServer>true</ShowServer>
</Connection>
<NuGetReference>EventStore.Client</NuGetReference>
<NuGetReference>Newtonsoft.Json</NuGetReference>
<NuGetReference>Rx-Linq</NuGetReference>
<NuGetReference>Rx-Main</NuGetReference>
<NuGetReference>Rx-WPF</NuGetReference>
<Namespace>EventStore.ClientAPI</Namespace>
<Namespace>EventStore.ClientAPI.SystemData</Namespace>
<Namespace>System</Namespace>
<Namespace>System.Net</Namespace>
<Namespace>System.Reactive</Namespace>
<Namespace>System.Reactive.Concurrency</Namespace>
<Namespace>System.Reactive.Disposables</Namespace>
<Namespace>System.Reactive.Joins</Namespace>
<Namespace>System.Reactive.Linq</Namespace>
<Namespace>System.Reactive.PlatformServices</Namespace>
<Namespace>System.Reactive.Subjects</Namespace>
<Namespace>System.Reactive.Threading.Tasks</Namespace>
<Namespace>System.Threading.Tasks</Namespace>
<Namespace>Newtonsoft.Json</Namespace>
<Namespace>Newtonsoft.Json.Converters</Namespace>
</Query>
void Main()
{
var scheduler = Scheduler.Default; // new SynchronizationContextScheduler(new SynchronizationContext() );//Scheduler.CurrentThread;
const String hostname = "ap01prod";
var subject = new Logger(scheduler).Dump("Log", 1);
var logWriter = Observer.Synchronize(subject, true); //.Checked(subject);
//subject.Select(o => new { ThreadId = o.Item1.ThreadId, Body = o.Item2, Delta = TimeSpan.FromTicks(DateTime.Now.Ticks - o.Item1.InTime.Ticks).TotalMilliseconds }).Dump("Log of EventStore Communication");
logWriter.OnNext("Start of log");
var cred = new UserCredentials("admin", "changeit");
/*
DateTime? lastSeen = null;
ReportOnDollarEtVsAll(cred,hostname,logWriter)
.Dump("From ItemDecoded")*/
var ts = TimeSpan.FromSeconds(10);
var lengths = ReportOnAllEventSizes(cred,hostname,logWriter).Publish();
lengths.GroupBy(x=>x.EventType)
.Select(g=> new {
EventType = g.Key,
MaxLength = g.Max(e=>e.EventLength),
MinLength = g.Min(e=>e.EventLength),
MeanLength = g.Average(e=>e.EventLength),
Count = g.Count()
}).Dump();
long totalEvents = 0;
long totalBytes = 0;
var fname = String.Format(@"c:\temp\lengths_{0}.csv",hostname);
File.Delete(fname);
var sw = Stopwatch.StartNew();
lengths
.Buffer(ts) // just so we aren't writting one at a time
.Do(x=>File.AppendAllLines(fname,x.Select(y=>y.ToCSV())))
.Select(w =>{
totalEvents = totalEvents + w.Count();
var bytes = w.Sum(v => v.EventLength) + w.Sum(v=>v.EventMetaLength);
totalBytes = totalBytes + bytes;
return new {CurrentRate = w.Count()/ts.TotalSeconds, BytesPsec = bytes/ts.TotalSeconds, TotalEvents = totalEvents, historicalRate = totalEvents/sw.Elapsed.TotalSeconds, historicalBytes = totalBytes/sw.Elapsed.TotalSeconds, totalkb = totalBytes/1024};
})
.DumpLatest("Rate of events Per second")
// continue with , so we finish the logger
.Subscribe(Observer.Create<Object>(e => { }, e => { }, () => {
logWriter.OnNext("Should be all done");
logWriter.OnCompleted();})
);
lengths.Connect();
}
public String ToGoogleDate(DateTime d)
{
var l = ToUnixTime(d);
//return $"new Date({l})";
return $"new Date({d.Year},{d.Month},{d.Day},{d.Hour},{d.Minute},{d.Millisecond})";
}
public long ToUnixTime(DateTime date)
{
var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
return Convert.ToInt64((date.ToUniversalTime() - epoch).TotalSeconds);
}
///Adds A timeout to the original observation, catches any TimeOutException that occurs
///returns the default of T, and the original observable
IObservable<T> dd<T>(IObservable<T> orig, TimeSpan timeoutPeriod)
{
return orig.Timeout(timeoutPeriod).Catch<T, TimeoutException>(e => { //"Exception".Dump();
return Observable.Merge(Observable.Return(default(T)), dd(orig, timeoutPeriod));});
}
IObservable<T> FindEventId<T>(UserCredentials cred, String hostname, IObserver<Object> logger, String evtId, string streamId)
{
var g = Guid.Parse(evtId);
return ReadAsObservable<T>(streamId, cred, logger, hostname)
.Where(i=>i.Item2.Event.EventId == g)
.Select(i=>i.Item1);
;
}
IObservable<EventDataLength> ReportOnAllEventSizes(UserCredentials cred, String hostname, IObserver<Object> logger)
{
return ReadAsObservable(cred, logger, hostname, true)
.Where(r => r.Link == null
&& !r.Event.EventType.StartsWith("$"))
.Select(r=>new EventDataLength(){
EventType = r.Event.EventType,
EventLength = r.Event.Data.Length,
EventMetaLength = r.Event.Metadata.Length}
);
}
public class EventDataLength
{
public String EventType { get; set; }
public int EventLength { get; set;}
public int EventMetaLength { get; set;}
public String ToCSV(){
return $"'{EventType}',{EventLength},{EventMetaLength}";
}
}
/*
see whether we have events that didn't make it into the $et stream that we found from All
*/
IEnumerable<Object> ReportOnDollarEtVsAll(UserCredentials cred, String hostname, IObserver<Object> logger)
{
return ReadAsObservable(cred, logger, hostname, true)
.Where(r => r.Link == null && !r.Event.EventType.StartsWith("$"))
.GroupBy(r => r.Event.EventType)/*
// wite out content to a file
.Do(g => {
// write out to a file
var fname = String.Format(@"c:\temp\xx_{0}.json", g.Key);
File.Delete(fname);
var nullObs = Observer.Create<IList<ResolvedEvent>>(ef => File.AppendAllLines(fname, ef.Select(o => Encoding.UTF8.GetString(o.Event.Data))));
g
//.SubscribeOn(Scheduler.Default)
.Buffer(TimeSpan.FromSeconds(5),5000)
.Subscribe(nullObs);
})*/
.Select(g => new
{
key = g.Key,
Count = g.LongCount()//.RunAsync(cts.Token)
//,events = g.Select(s=>s.Event.EventStreamId).Distinct()
,
fromEt = //Observable.Return($"$et-{g.Key}")
//.Do(i=> logger.OnNext(i))
System.Threading.Tasks.Task.Run(async ()=> await ReadAsObservable($"$et-{g.Key}", cred, logger, hostname, true).LongCount())
//ReadAsObservable($"$et-{g.Key}", cred, logger, hostname, true).SubscribeOn(ThreadPoolScheduler.Instance).LongCount()
})
.Select(async l => {
var allStreamCount = await l.Count;
var fromEtCount = await l.fromEt;
return new { Streamid = l.key,
DifferenceBetweenAllAndEt = allStreamCount - fromEtCount,
AllStreamCount = allStreamCount,
EtCount = fromEtCount };
})
.ToEnumerable().ToList()
//.Select(async l => new { Streamid = l.key, DifferenceBetweenAllAndEt = await l.Count - await l.fromEt})
.Where(l => l.Result.DifferenceBetweenAllAndEt != 0)
;
}
bool DoesSourceEventExistIn(UserCredentials cred, String hostname, IObserver<Object> logger, String sourceStream, int eventNum, String destStream)
{
var initialEvent = ReadAsObservable(sourceStream,cred,logger,hostname,true)
.Where(i=>i.OriginalEvent.EventNumber == eventNum)
.Dump("initial")
.Wait();
return ReadAsObservable(destStream,cred,logger,hostname,true)//.Take(10).Dump("destStream")
.Where(i=>i.Event.EventStreamId == initialEvent.Event.EventStreamId
&& i.Event.EventNumber ==initialEvent.Event.EventNumber)
.Any().Wait();
}
IEventStoreConnection getConn(String hostname)
{
var settings = ConnectionSettings.Create();
//settings.SetHeartbeatTimeout(TimeSpan.FromMilliseconds(2500));
//settings.SetReconnectionDelayTo(TimeSpan.FromSeconds(5));
//settings.KeepRetrying();
//settings.EnableVerboseLogging();
//settings.UseConsoleLogger();
//settings.UseFileLogger(@"c:\temp\es_conns.log");
var ipaddress = Dns.GetHostAddresses(hostname).First();
//ipaddress = IPAddress.Parse( "127.0.0.1");
var conn = EventStoreConnection.Create(settings.Build(),new IPEndPoint(ipaddress, 1113));
conn.ConnectAsync().Wait();
return conn;
}
void ReconnectingEventHandler(object sender, ClientReconnectingEventArgs args)
{
args.Dump("Reconnecting");
}
///Reads from the ALL stream, so across all streams
IObservable<ResolvedEvent> ReadAsObservable(UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev"
, bool stopOnCatchup = true
, [System.Runtime.CompilerServices.CallerMemberName] string memberName = ""
, [System.Runtime.CompilerServices.CallerLineNumber] int sourceLineNumber = 0)
{
return Observable.Using(
() => getConn(hostname),
conn =>
{
return Observable.Create<ResolvedEvent>((o) =>
{
var connReconnecting = Observable.FromEvent<EventHandler<ClientReconnectingEventArgs>, ClientReconnectingEventArgs>(handler =>
{
EventHandler<ClientReconnectingEventArgs> reconnectHandler = (sender, e) =>
{
handler(e);
};
return reconnectHandler;
},
reConnHandler => conn.Reconnecting += reConnHandler, reConnHandler => {logger.OnNext("losing reconn Delegeate for $all");
conn.Reconnecting -= reConnHandler;
});
var observer = Observer.Create<Timestamped<ClientReconnectingEventArgs>>(i => i.Dump("Reconnect for $all") );
var ReconnSub = connReconnecting.Timestamp().Subscribe(observer);
logger.OnNext($"Start subscription to $All, from {memberName}");
var esSub = conn.SubscribeToAllFrom(null, true
, (s, e) => o.OnNext(e)
, s =>
{
logger.OnNext("$All caughtup and now live");
if (stopOnCatchup)
{
logger.OnNext("Calling OnCompleted while listening to $All");
o.OnCompleted();
ReconnSub.Dispose();
}
else
logger.OnNext("still subscribed to $All, because we were told not to end");
}
, (s, sr, e) =>
{
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, "$all"));
if (e != null)
{
logger.OnNext("DropException");
logger.OnNext(e);
}
o.OnCompleted();
ReconnSub.Dispose();
}
, cred);
return Disposable.Create(() =>
{
logger.OnNext("Observer has unsubscribed. Disposing resource of eventstore Conn. for $all");
ReconnSub.Dispose();
});
});
});
}
IObservable<ResolvedEvent> ReadAsObservable(String projectionName, UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev", bool stopOnCatchup = true)
{
return Observable.Using(
() => getConn(hostname),
conn =>
{
return Observable.Create<ResolvedEvent>((o) =>
{
var connReconnecting = Observable.FromEvent<EventHandler<ClientReconnectingEventArgs>, ClientReconnectingEventArgs>(handler =>
{
EventHandler<ClientReconnectingEventArgs> reconnectHandler = (sender, e) =>
{
handler(e);
};
return reconnectHandler;
},
reConnHandler => {logger.OnNext($"gaining reconn Delegeate for {projectionName}"); conn.Reconnecting += reConnHandler;},
reConnHandler => {logger.OnNext($"losing reconn Delegeate for {projectionName}");
conn.Reconnecting -= reConnHandler;
})
.Timestamp();
var subject = new Subject<Timestamped<ClientReconnectingEventArgs>>();
var subjectSubscription = connReconnecting.Subscribe(subject.Checked());
// for unknown reasons this subscription seems to stay alive at least to looking at the wpf control of linqpad
// in that it doesn't show a check with complete marked. The use of subject, etc. was all an effort to properly
// end the subscription
subject.DumpLatest(renderWithWpf: true, description: $"Reconn for proj {projectionName}");
logger.OnNext($"Start subscription to {projectionName}");
var esSub = conn.SubscribeToStreamFrom(projectionName, null, true
, (s, e) => o.OnNext(e)
, s =>
{
logger.OnNext($"Projection {projectionName} caughtup and now live");
if (stopOnCatchup)
{
logger.OnNext($"Calling OnCompleted while listening to {projectionName}");
o.OnCompleted();
}
else
logger.OnNext($"still subscribed {projectionName}, because we were told not to end");
}
, (s, sr, e) =>
{
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, projectionName));
if (e != null)
{
logger.OnNext($"DropException on {projectionName} ");
logger.OnNext(e);
}
o.OnCompleted();
}
, cred);
return Disposable.Create(() =>
{
logger.OnNext($"Observer has unsubscribed. Disposing resource of eventstore Conn for Stream {projectionName}");
subjectSubscription.Dispose();
subject.OnCompleted();
});
});
});
}
IObservable<ResolvedEvent> ReadAsObservableAsync(String projectionName, UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev", bool stopOnCatchup = true)
{
/// would like this to be async - but have not done thi
return Observable.Using(
() => getConn(hostname),
conn =>
{
return Observable.Create<ResolvedEvent>((o) =>
{
var connReconnecting = Observable.FromEvent<EventHandler<ClientReconnectingEventArgs>, ClientReconnectingEventArgs>(handler =>
{
EventHandler<ClientReconnectingEventArgs> reconnectHandler = (sender, e) =>
{
handler(e);
};
return reconnectHandler;
},
reConnHandler => conn.Reconnecting += reConnHandler, reConnHandler => conn.Reconnecting -= reConnHandler);
//conn.Reconnecting += ReconnectingEventHandler;
var observer = Observer.Create<Timestamped<ClientReconnectingEventArgs>>(i => { });
connReconnecting.Timestamp().DumpLatest(renderWithWpf: true, description: $"Reconn for proj {projectionName}");
var esSub = conn.SubscribeToStreamFrom(projectionName, null, true
, (s, e) => o.OnNext(e)
, s =>
{
logger.OnNext($"Projection {projectionName} caughtup and now live");
if (stopOnCatchup)
{
logger.OnNext($"Calling OnCompleted while listening to {projectionName}");
o.OnCompleted();
}
else
logger.OnNext($"still subscribed {projectionName}, because we were told not to end");
}
, (s, sr, e) =>
{
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, projectionName));
if (e != null)
{
logger.OnNext("DropException");
logger.OnNext(e);
}
o.OnCompleted();
}
, cred);
// who's there
// whose
return Disposable.Create(() =>
{
logger.OnNext($"Observer has unsubscribed. Disposing resource of eventstore Conn for Stream {projectionName}");
});
});
});
}
IObservable<Tuple<T, ResolvedEvent>> ReadAsObservable<T>(String projectionName, UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev", bool stopOnCatchup = true) {
var myc = new MyContainer<T>();
return Observable.Using(
()=>getConn(hostname),
conn =>
{
return Observable.Create<Tuple<T,ResolvedEvent>>((o) =>
{
var esSub = conn.SubscribeToStreamFrom(projectionName, null, true
, (s, e) => myc.AddEvent(projectionName, e, o)
, s => {
logger.OnNext($"Projection {projectionName} caughtup and now live");
if (stopOnCatchup)
o.OnCompleted();
else
logger.OnNext("still subscribed, because we were told not to end");
}
, (s, sr, e) =>
{
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, projectionName));
if (e != null)
{
logger.OnNext("DropException");
logger.OnNext(e);
}
o.OnCompleted();
}
, cred);
// who's there
// whose
return Disposable.Create(() =>
{
logger.OnNext($"Observer has unsubscribed. Disposing resource of eventstore Conn for Stream {projectionName}");
});
});
});
}
String SerializeWithNoType(Object o)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(o, Newtonsoft.Json.Formatting.Indented, new Newtonsoft.Json.JsonSerializerSettings() { TypeNameHandling = Newtonsoft.Json.TypeNameHandling.Auto, Converters = new List<Newtonsoft.Json.JsonConverter>() { new Newtonsoft.Json.Converters.StringEnumConverter() }});
}
public static readonly JsonNetSerializer DefaultSerializer = new JsonNetSerializer();
// Define other methods and classes here
public class MyContainer<T> {
protected JsonNetSerializer serializer= new JsonNetSerializer();
public void AddEvent(String projName, ResolvedEvent e, IObserver<Tuple<T, ResolvedEvent>> o)
{
var r = serializer.Deserialize<T>(e.Event.Data);
o.OnNext(Tuple.Create(r,e));
}
}
public class Logger : IObservable<Tuple<LogInfo,Object>>, IObserver<Object>
{
ISubject<Tuple<LogInfo,Object>> sub;
IScheduler schedule;
public Logger(IScheduler sch)
{
schedule = sch;
sub = new ReplaySubject<Tuple<LogInfo,Object>>(schedule);
}
public void OnNext(Object o)
{
sub.OnNext(Tuple.Create(new LogInfo(),o));
}
public void OnError(Exception e)
{
sub.OnNext(Tuple.Create(new LogInfo(),(Object)e));
}
public void OnCompleted()
{
sub.OnCompleted();
}
public IDisposable Subscribe(IObserver<Tuple<LogInfo,Object>> o)
{
return sub.Subscribe(o);
}
}
public class LogInfo
{
public String ThreadId { get; set;}
public DateTime InTime { get; set;}
public LogInfo()
{
ThreadId = Thread.CurrentThread.ManagedThreadId.ToString();
InTime = DateTime.Now;
}
}
public static class ResolvedEventExtensions
{
public static Hyperlinq ToLink(this ResolvedEvent i, String hostname) {
return new Hyperlinq(String.Format("http://{0}:2113/web/index.html#/streams/{1}/{2}", hostname, i.Event.EventStreamId, i.Event.EventNumber));
}
}
/// <summary>
/// Used for both Serializing to objects to be used for the eventStore and
/// for Deserializing.
/// The objects which are serialized are events, which happen also to be
/// classes.
/// </summary>
public class JsonNetSerializer
{
private readonly JsonSerializer serializer;
private readonly JsonSerializer deserializer;
private Dictionary<string, Type> eventTypeToType;
private Dictionary<Type, String> clrTypeToEventStoreType;
/// <summary>
/// Maps an eventStoreType to a CLR type
/// </summary>
/// <param name="eventTypeToTypeMappings">list of a map from an eventStoreType to a CLR type. Used when deserialzing</param>
public JsonNetSerializer(IEnumerable<KeyValuePair<string, Type>> eventTypeToTypeMappings)
{
this.serializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.Objects };
this.deserializer = new JsonSerializer();
this.serializer.Converters.Add(new StringEnumConverter());
InitializeTypeMappings(eventTypeToTypeMappings);
}
/// <summary>
/// Uses default mapping from eventType to class name of serialized event.
/// For deserialization the type information within the serialization is used
/// </summary>
public JsonNetSerializer()
{
this.serializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.Objects };
this.deserializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.Objects };
this.serializer.Converters.Add(new StringEnumConverter());
InitializeTypeMappings(null);
}
private void InitializeTypeMappings(IEnumerable<KeyValuePair<string, Type>> eventTypeToTypeMappings)
{
var mappings = eventTypeToTypeMappings ?? new Dictionary<string, Type>();
this.eventTypeToType = mappings.ToDictionary(x => x.Key, x => x.Value);
this.clrTypeToEventStoreType = eventTypeToType.ToDictionary(x => x.Value, x => x.Key);
}
private string MapToEventType(object e)
{
// Use mappings if provided
if (clrTypeToEventStoreType != null && clrTypeToEventStoreType.Count > 0)
{
if (!clrTypeToEventStoreType.ContainsKey(e.GetType()))
{
throw new ApplicationException(String.Format("Serializer is supposed to know about type {0}, but does not.", e.GetType().FullName));
}
return clrTypeToEventStoreType[e.GetType()];
}
return e.GetType().Name;
}
private Type MapFromEventType(string eventType)
{
if (clrTypeToEventStoreType == null || clrTypeToEventStoreType.Count <= 0) return null;
// Use mappings if provided, ignore if one doesn't exist
return this.eventTypeToType.ContainsKey(eventType) ?
this.eventTypeToType[eventType] :
null;
}
public byte[] SerializeToBytes(object e)
{
var ms = new MemoryStream();
using (var w = new StreamWriter(ms))
{
this.serializer.Serialize(w, e);
}
return ms.ToArray();
}
/// <summary>
/// A convience specialization of Deserialize(String,byte[]), perhaps should
/// be an extension method, but it satisfies the interface
/// </summary>
/// <param name="resolvedEvent"></param>
/// <returns></returns>
public IEnumerable<object> Deserialize(ResolvedEvent resolvedEvent)
{
return Deserialize(resolvedEvent.Event.EventType, resolvedEvent.Event.Data);
}
public IEnumerable<object> Deserialize(String typename, byte[] data)
{
var type = MapFromEventType(typename);
if (type != null)
yield return Deserialize(type, data);
else
yield return Deserialize(data);
}
private object Deserialize(Type type, byte[] data)
{
if (type != null)
{
using (var sr = new StreamReader(new MemoryStream(data)))
{
return this.deserializer.Deserialize(sr, type);
}
}
throw new ArgumentNullException("passed in null for the type, which should never be.");
}
private object Deserialize(byte[] data)
{
using (var sr = new StreamReader(new MemoryStream(data)))
{
return this.deserializer.Deserialize(new JsonTextReader(sr));
}
}
public T Deserialize<T>(byte[] jsonbytes)
{
using (var sr = new StreamReader(new MemoryStream(jsonbytes)))
{
return (T)deserializer.Deserialize(sr, typeof(T));
}
}
public T Deserialize<T>(String jsonString)
{
return Deserialize<T>(Encoding.UTF8.GetBytes(jsonString));
}
/// <summary>
/// Helper function for creating a name to type, for use on the constructor
/// </summary>
/// <param name="types"></param>
/// <returns></returns>
public static IDictionary<String, Type> DefaultNameToType(IEnumerable<Type> types)
{
return types.ToDictionary(t => t.Name, t=>t);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment