Created
April 26, 2016 18:11
-
-
Save kstrauss/9478fc76ed68a88480d0ef09de8f9e7d to your computer and use it in GitHub Desktop.
Reproduction code for EventStore Issue #900
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Query Kind="Program"> | |
<Connection> | |
<ID>e8afa357-aae4-40ae-a4ad-e4783f38f696</ID> | |
<Server>rdbtest2012</Server> | |
<Database>PackageMonitorReadModel_SGSpike-Main</Database> | |
<ShowServer>true</ShowServer> | |
</Connection> | |
<NuGetReference>EventStore.Client</NuGetReference> | |
<NuGetReference>Newtonsoft.Json</NuGetReference> | |
<NuGetReference>Rx-Linq</NuGetReference> | |
<NuGetReference>Rx-Main</NuGetReference> | |
<NuGetReference>Rx-WPF</NuGetReference> | |
<Namespace>EventStore.ClientAPI</Namespace> | |
<Namespace>EventStore.ClientAPI.SystemData</Namespace> | |
<Namespace>System</Namespace> | |
<Namespace>System.Net</Namespace> | |
<Namespace>System.Reactive</Namespace> | |
<Namespace>System.Reactive.Concurrency</Namespace> | |
<Namespace>System.Reactive.Disposables</Namespace> | |
<Namespace>System.Reactive.Joins</Namespace> | |
<Namespace>System.Reactive.Linq</Namespace> | |
<Namespace>System.Reactive.PlatformServices</Namespace> | |
<Namespace>System.Reactive.Subjects</Namespace> | |
<Namespace>System.Reactive.Threading.Tasks</Namespace> | |
<Namespace>System.Threading.Tasks</Namespace> | |
<Namespace>Newtonsoft.Json</Namespace> | |
<Namespace>Newtonsoft.Json.Converters</Namespace> | |
</Query> | |
void Main() | |
{ | |
var scheduler = Scheduler.Default; // new SynchronizationContextScheduler(new SynchronizationContext() );//Scheduler.CurrentThread; | |
const String hostname = "ap01prod"; | |
var subject = new Logger(scheduler).Dump("Log", 1); | |
var logWriter = Observer.Synchronize(subject, true); //.Checked(subject); | |
//subject.Select(o => new { ThreadId = o.Item1.ThreadId, Body = o.Item2, Delta = TimeSpan.FromTicks(DateTime.Now.Ticks - o.Item1.InTime.Ticks).TotalMilliseconds }).Dump("Log of EventStore Communication"); | |
logWriter.OnNext("Start of log"); | |
var cred = new UserCredentials("admin", "changeit"); | |
/* | |
DateTime? lastSeen = null; | |
ReportOnDollarEtVsAll(cred,hostname,logWriter) | |
.Dump("From ItemDecoded")*/ | |
var ts = TimeSpan.FromSeconds(10); | |
var lengths = ReportOnAllEventSizes(cred,hostname,logWriter).Publish(); | |
lengths.GroupBy(x=>x.EventType) | |
.Select(g=> new { | |
EventType = g.Key, | |
MaxLength = g.Max(e=>e.EventLength), | |
MinLength = g.Min(e=>e.EventLength), | |
MeanLength = g.Average(e=>e.EventLength), | |
Count = g.Count() | |
}).Dump(); | |
long totalEvents = 0; | |
long totalBytes = 0; | |
var fname = String.Format(@"c:\temp\lengths_{0}.csv",hostname); | |
File.Delete(fname); | |
var sw = Stopwatch.StartNew(); | |
lengths | |
.Buffer(ts) // just so we aren't writting one at a time | |
.Do(x=>File.AppendAllLines(fname,x.Select(y=>y.ToCSV()))) | |
.Select(w =>{ | |
totalEvents = totalEvents + w.Count(); | |
var bytes = w.Sum(v => v.EventLength) + w.Sum(v=>v.EventMetaLength); | |
totalBytes = totalBytes + bytes; | |
return new {CurrentRate = w.Count()/ts.TotalSeconds, BytesPsec = bytes/ts.TotalSeconds, TotalEvents = totalEvents, historicalRate = totalEvents/sw.Elapsed.TotalSeconds, historicalBytes = totalBytes/sw.Elapsed.TotalSeconds, totalkb = totalBytes/1024}; | |
}) | |
.DumpLatest("Rate of events Per second") | |
// continue with , so we finish the logger | |
.Subscribe(Observer.Create<Object>(e => { }, e => { }, () => { | |
logWriter.OnNext("Should be all done"); | |
logWriter.OnCompleted();}) | |
); | |
lengths.Connect(); | |
} | |
public String ToGoogleDate(DateTime d) | |
{ | |
var l = ToUnixTime(d); | |
//return $"new Date({l})"; | |
return $"new Date({d.Year},{d.Month},{d.Day},{d.Hour},{d.Minute},{d.Millisecond})"; | |
} | |
public long ToUnixTime(DateTime date) | |
{ | |
var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); | |
return Convert.ToInt64((date.ToUniversalTime() - epoch).TotalSeconds); | |
} | |
///Adds A timeout to the original observation, catches any TimeOutException that occurs | |
///returns the default of T, and the original observable | |
IObservable<T> dd<T>(IObservable<T> orig, TimeSpan timeoutPeriod) | |
{ | |
return orig.Timeout(timeoutPeriod).Catch<T, TimeoutException>(e => { //"Exception".Dump(); | |
return Observable.Merge(Observable.Return(default(T)), dd(orig, timeoutPeriod));}); | |
} | |
IObservable<T> FindEventId<T>(UserCredentials cred, String hostname, IObserver<Object> logger, String evtId, string streamId) | |
{ | |
var g = Guid.Parse(evtId); | |
return ReadAsObservable<T>(streamId, cred, logger, hostname) | |
.Where(i=>i.Item2.Event.EventId == g) | |
.Select(i=>i.Item1); | |
; | |
} | |
IObservable<EventDataLength> ReportOnAllEventSizes(UserCredentials cred, String hostname, IObserver<Object> logger) | |
{ | |
return ReadAsObservable(cred, logger, hostname, true) | |
.Where(r => r.Link == null | |
&& !r.Event.EventType.StartsWith("$")) | |
.Select(r=>new EventDataLength(){ | |
EventType = r.Event.EventType, | |
EventLength = r.Event.Data.Length, | |
EventMetaLength = r.Event.Metadata.Length} | |
); | |
} | |
public class EventDataLength | |
{ | |
public String EventType { get; set; } | |
public int EventLength { get; set;} | |
public int EventMetaLength { get; set;} | |
public String ToCSV(){ | |
return $"'{EventType}',{EventLength},{EventMetaLength}"; | |
} | |
} | |
/* | |
see whether we have events that didn't make it into the $et stream that we found from All | |
*/ | |
IEnumerable<Object> ReportOnDollarEtVsAll(UserCredentials cred, String hostname, IObserver<Object> logger) | |
{ | |
return ReadAsObservable(cred, logger, hostname, true) | |
.Where(r => r.Link == null && !r.Event.EventType.StartsWith("$")) | |
.GroupBy(r => r.Event.EventType)/* | |
// wite out content to a file | |
.Do(g => { | |
// write out to a file | |
var fname = String.Format(@"c:\temp\xx_{0}.json", g.Key); | |
File.Delete(fname); | |
var nullObs = Observer.Create<IList<ResolvedEvent>>(ef => File.AppendAllLines(fname, ef.Select(o => Encoding.UTF8.GetString(o.Event.Data)))); | |
g | |
//.SubscribeOn(Scheduler.Default) | |
.Buffer(TimeSpan.FromSeconds(5),5000) | |
.Subscribe(nullObs); | |
})*/ | |
.Select(g => new | |
{ | |
key = g.Key, | |
Count = g.LongCount()//.RunAsync(cts.Token) | |
//,events = g.Select(s=>s.Event.EventStreamId).Distinct() | |
, | |
fromEt = //Observable.Return($"$et-{g.Key}") | |
//.Do(i=> logger.OnNext(i)) | |
System.Threading.Tasks.Task.Run(async ()=> await ReadAsObservable($"$et-{g.Key}", cred, logger, hostname, true).LongCount()) | |
//ReadAsObservable($"$et-{g.Key}", cred, logger, hostname, true).SubscribeOn(ThreadPoolScheduler.Instance).LongCount() | |
}) | |
.Select(async l => { | |
var allStreamCount = await l.Count; | |
var fromEtCount = await l.fromEt; | |
return new { Streamid = l.key, | |
DifferenceBetweenAllAndEt = allStreamCount - fromEtCount, | |
AllStreamCount = allStreamCount, | |
EtCount = fromEtCount }; | |
}) | |
.ToEnumerable().ToList() | |
//.Select(async l => new { Streamid = l.key, DifferenceBetweenAllAndEt = await l.Count - await l.fromEt}) | |
.Where(l => l.Result.DifferenceBetweenAllAndEt != 0) | |
; | |
} | |
bool DoesSourceEventExistIn(UserCredentials cred, String hostname, IObserver<Object> logger, String sourceStream, int eventNum, String destStream) | |
{ | |
var initialEvent = ReadAsObservable(sourceStream,cred,logger,hostname,true) | |
.Where(i=>i.OriginalEvent.EventNumber == eventNum) | |
.Dump("initial") | |
.Wait(); | |
return ReadAsObservable(destStream,cred,logger,hostname,true)//.Take(10).Dump("destStream") | |
.Where(i=>i.Event.EventStreamId == initialEvent.Event.EventStreamId | |
&& i.Event.EventNumber ==initialEvent.Event.EventNumber) | |
.Any().Wait(); | |
} | |
IEventStoreConnection getConn(String hostname) | |
{ | |
var settings = ConnectionSettings.Create(); | |
//settings.SetHeartbeatTimeout(TimeSpan.FromMilliseconds(2500)); | |
//settings.SetReconnectionDelayTo(TimeSpan.FromSeconds(5)); | |
//settings.KeepRetrying(); | |
//settings.EnableVerboseLogging(); | |
//settings.UseConsoleLogger(); | |
//settings.UseFileLogger(@"c:\temp\es_conns.log"); | |
var ipaddress = Dns.GetHostAddresses(hostname).First(); | |
//ipaddress = IPAddress.Parse( "127.0.0.1"); | |
var conn = EventStoreConnection.Create(settings.Build(),new IPEndPoint(ipaddress, 1113)); | |
conn.ConnectAsync().Wait(); | |
return conn; | |
} | |
void ReconnectingEventHandler(object sender, ClientReconnectingEventArgs args) | |
{ | |
args.Dump("Reconnecting"); | |
} | |
///Reads from the ALL stream, so across all streams | |
IObservable<ResolvedEvent> ReadAsObservable(UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev" | |
, bool stopOnCatchup = true | |
, [System.Runtime.CompilerServices.CallerMemberName] string memberName = "" | |
, [System.Runtime.CompilerServices.CallerLineNumber] int sourceLineNumber = 0) | |
{ | |
return Observable.Using( | |
() => getConn(hostname), | |
conn => | |
{ | |
return Observable.Create<ResolvedEvent>((o) => | |
{ | |
var connReconnecting = Observable.FromEvent<EventHandler<ClientReconnectingEventArgs>, ClientReconnectingEventArgs>(handler => | |
{ | |
EventHandler<ClientReconnectingEventArgs> reconnectHandler = (sender, e) => | |
{ | |
handler(e); | |
}; | |
return reconnectHandler; | |
}, | |
reConnHandler => conn.Reconnecting += reConnHandler, reConnHandler => {logger.OnNext("losing reconn Delegeate for $all"); | |
conn.Reconnecting -= reConnHandler; | |
}); | |
var observer = Observer.Create<Timestamped<ClientReconnectingEventArgs>>(i => i.Dump("Reconnect for $all") ); | |
var ReconnSub = connReconnecting.Timestamp().Subscribe(observer); | |
logger.OnNext($"Start subscription to $All, from {memberName}"); | |
var esSub = conn.SubscribeToAllFrom(null, true | |
, (s, e) => o.OnNext(e) | |
, s => | |
{ | |
logger.OnNext("$All caughtup and now live"); | |
if (stopOnCatchup) | |
{ | |
logger.OnNext("Calling OnCompleted while listening to $All"); | |
o.OnCompleted(); | |
ReconnSub.Dispose(); | |
} | |
else | |
logger.OnNext("still subscribed to $All, because we were told not to end"); | |
} | |
, (s, sr, e) => | |
{ | |
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, "$all")); | |
if (e != null) | |
{ | |
logger.OnNext("DropException"); | |
logger.OnNext(e); | |
} | |
o.OnCompleted(); | |
ReconnSub.Dispose(); | |
} | |
, cred); | |
return Disposable.Create(() => | |
{ | |
logger.OnNext("Observer has unsubscribed. Disposing resource of eventstore Conn. for $all"); | |
ReconnSub.Dispose(); | |
}); | |
}); | |
}); | |
} | |
IObservable<ResolvedEvent> ReadAsObservable(String projectionName, UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev", bool stopOnCatchup = true) | |
{ | |
return Observable.Using( | |
() => getConn(hostname), | |
conn => | |
{ | |
return Observable.Create<ResolvedEvent>((o) => | |
{ | |
var connReconnecting = Observable.FromEvent<EventHandler<ClientReconnectingEventArgs>, ClientReconnectingEventArgs>(handler => | |
{ | |
EventHandler<ClientReconnectingEventArgs> reconnectHandler = (sender, e) => | |
{ | |
handler(e); | |
}; | |
return reconnectHandler; | |
}, | |
reConnHandler => {logger.OnNext($"gaining reconn Delegeate for {projectionName}"); conn.Reconnecting += reConnHandler;}, | |
reConnHandler => {logger.OnNext($"losing reconn Delegeate for {projectionName}"); | |
conn.Reconnecting -= reConnHandler; | |
}) | |
.Timestamp(); | |
var subject = new Subject<Timestamped<ClientReconnectingEventArgs>>(); | |
var subjectSubscription = connReconnecting.Subscribe(subject.Checked()); | |
// for unknown reasons this subscription seems to stay alive at least to looking at the wpf control of linqpad | |
// in that it doesn't show a check with complete marked. The use of subject, etc. was all an effort to properly | |
// end the subscription | |
subject.DumpLatest(renderWithWpf: true, description: $"Reconn for proj {projectionName}"); | |
logger.OnNext($"Start subscription to {projectionName}"); | |
var esSub = conn.SubscribeToStreamFrom(projectionName, null, true | |
, (s, e) => o.OnNext(e) | |
, s => | |
{ | |
logger.OnNext($"Projection {projectionName} caughtup and now live"); | |
if (stopOnCatchup) | |
{ | |
logger.OnNext($"Calling OnCompleted while listening to {projectionName}"); | |
o.OnCompleted(); | |
} | |
else | |
logger.OnNext($"still subscribed {projectionName}, because we were told not to end"); | |
} | |
, (s, sr, e) => | |
{ | |
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, projectionName)); | |
if (e != null) | |
{ | |
logger.OnNext($"DropException on {projectionName} "); | |
logger.OnNext(e); | |
} | |
o.OnCompleted(); | |
} | |
, cred); | |
return Disposable.Create(() => | |
{ | |
logger.OnNext($"Observer has unsubscribed. Disposing resource of eventstore Conn for Stream {projectionName}"); | |
subjectSubscription.Dispose(); | |
subject.OnCompleted(); | |
}); | |
}); | |
}); | |
} | |
IObservable<ResolvedEvent> ReadAsObservableAsync(String projectionName, UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev", bool stopOnCatchup = true) | |
{ | |
/// would like this to be async - but have not done thi | |
return Observable.Using( | |
() => getConn(hostname), | |
conn => | |
{ | |
return Observable.Create<ResolvedEvent>((o) => | |
{ | |
var connReconnecting = Observable.FromEvent<EventHandler<ClientReconnectingEventArgs>, ClientReconnectingEventArgs>(handler => | |
{ | |
EventHandler<ClientReconnectingEventArgs> reconnectHandler = (sender, e) => | |
{ | |
handler(e); | |
}; | |
return reconnectHandler; | |
}, | |
reConnHandler => conn.Reconnecting += reConnHandler, reConnHandler => conn.Reconnecting -= reConnHandler); | |
//conn.Reconnecting += ReconnectingEventHandler; | |
var observer = Observer.Create<Timestamped<ClientReconnectingEventArgs>>(i => { }); | |
connReconnecting.Timestamp().DumpLatest(renderWithWpf: true, description: $"Reconn for proj {projectionName}"); | |
var esSub = conn.SubscribeToStreamFrom(projectionName, null, true | |
, (s, e) => o.OnNext(e) | |
, s => | |
{ | |
logger.OnNext($"Projection {projectionName} caughtup and now live"); | |
if (stopOnCatchup) | |
{ | |
logger.OnNext($"Calling OnCompleted while listening to {projectionName}"); | |
o.OnCompleted(); | |
} | |
else | |
logger.OnNext($"still subscribed {projectionName}, because we were told not to end"); | |
} | |
, (s, sr, e) => | |
{ | |
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, projectionName)); | |
if (e != null) | |
{ | |
logger.OnNext("DropException"); | |
logger.OnNext(e); | |
} | |
o.OnCompleted(); | |
} | |
, cred); | |
// who's there | |
// whose | |
return Disposable.Create(() => | |
{ | |
logger.OnNext($"Observer has unsubscribed. Disposing resource of eventstore Conn for Stream {projectionName}"); | |
}); | |
}); | |
}); | |
} | |
IObservable<Tuple<T, ResolvedEvent>> ReadAsObservable<T>(String projectionName, UserCredentials cred, IObserver<Object> logger, String hostname = "Dmitriydev", bool stopOnCatchup = true) { | |
var myc = new MyContainer<T>(); | |
return Observable.Using( | |
()=>getConn(hostname), | |
conn => | |
{ | |
return Observable.Create<Tuple<T,ResolvedEvent>>((o) => | |
{ | |
var esSub = conn.SubscribeToStreamFrom(projectionName, null, true | |
, (s, e) => myc.AddEvent(projectionName, e, o) | |
, s => { | |
logger.OnNext($"Projection {projectionName} caughtup and now live"); | |
if (stopOnCatchup) | |
o.OnCompleted(); | |
else | |
logger.OnNext("still subscribed, because we were told not to end"); | |
} | |
, (s, sr, e) => | |
{ | |
logger.OnNext(String.Format("EventStore Subscription Dropped: subscription to {1} because {0}", sr, projectionName)); | |
if (e != null) | |
{ | |
logger.OnNext("DropException"); | |
logger.OnNext(e); | |
} | |
o.OnCompleted(); | |
} | |
, cred); | |
// who's there | |
// whose | |
return Disposable.Create(() => | |
{ | |
logger.OnNext($"Observer has unsubscribed. Disposing resource of eventstore Conn for Stream {projectionName}"); | |
}); | |
}); | |
}); | |
} | |
String SerializeWithNoType(Object o) | |
{ | |
return Newtonsoft.Json.JsonConvert.SerializeObject(o, Newtonsoft.Json.Formatting.Indented, new Newtonsoft.Json.JsonSerializerSettings() { TypeNameHandling = Newtonsoft.Json.TypeNameHandling.Auto, Converters = new List<Newtonsoft.Json.JsonConverter>() { new Newtonsoft.Json.Converters.StringEnumConverter() }}); | |
} | |
public static readonly JsonNetSerializer DefaultSerializer = new JsonNetSerializer(); | |
// Define other methods and classes here | |
public class MyContainer<T> { | |
protected JsonNetSerializer serializer= new JsonNetSerializer(); | |
public void AddEvent(String projName, ResolvedEvent e, IObserver<Tuple<T, ResolvedEvent>> o) | |
{ | |
var r = serializer.Deserialize<T>(e.Event.Data); | |
o.OnNext(Tuple.Create(r,e)); | |
} | |
} | |
public class Logger : IObservable<Tuple<LogInfo,Object>>, IObserver<Object> | |
{ | |
ISubject<Tuple<LogInfo,Object>> sub; | |
IScheduler schedule; | |
public Logger(IScheduler sch) | |
{ | |
schedule = sch; | |
sub = new ReplaySubject<Tuple<LogInfo,Object>>(schedule); | |
} | |
public void OnNext(Object o) | |
{ | |
sub.OnNext(Tuple.Create(new LogInfo(),o)); | |
} | |
public void OnError(Exception e) | |
{ | |
sub.OnNext(Tuple.Create(new LogInfo(),(Object)e)); | |
} | |
public void OnCompleted() | |
{ | |
sub.OnCompleted(); | |
} | |
public IDisposable Subscribe(IObserver<Tuple<LogInfo,Object>> o) | |
{ | |
return sub.Subscribe(o); | |
} | |
} | |
public class LogInfo | |
{ | |
public String ThreadId { get; set;} | |
public DateTime InTime { get; set;} | |
public LogInfo() | |
{ | |
ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(); | |
InTime = DateTime.Now; | |
} | |
} | |
public static class ResolvedEventExtensions | |
{ | |
public static Hyperlinq ToLink(this ResolvedEvent i, String hostname) { | |
return new Hyperlinq(String.Format("http://{0}:2113/web/index.html#/streams/{1}/{2}", hostname, i.Event.EventStreamId, i.Event.EventNumber)); | |
} | |
} | |
/// <summary> | |
/// Used for both Serializing to objects to be used for the eventStore and | |
/// for Deserializing. | |
/// The objects which are serialized are events, which happen also to be | |
/// classes. | |
/// </summary> | |
public class JsonNetSerializer | |
{ | |
private readonly JsonSerializer serializer; | |
private readonly JsonSerializer deserializer; | |
private Dictionary<string, Type> eventTypeToType; | |
private Dictionary<Type, String> clrTypeToEventStoreType; | |
/// <summary> | |
/// Maps an eventStoreType to a CLR type | |
/// </summary> | |
/// <param name="eventTypeToTypeMappings">list of a map from an eventStoreType to a CLR type. Used when deserialzing</param> | |
public JsonNetSerializer(IEnumerable<KeyValuePair<string, Type>> eventTypeToTypeMappings) | |
{ | |
this.serializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.Objects }; | |
this.deserializer = new JsonSerializer(); | |
this.serializer.Converters.Add(new StringEnumConverter()); | |
InitializeTypeMappings(eventTypeToTypeMappings); | |
} | |
/// <summary> | |
/// Uses default mapping from eventType to class name of serialized event. | |
/// For deserialization the type information within the serialization is used | |
/// </summary> | |
public JsonNetSerializer() | |
{ | |
this.serializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.Objects }; | |
this.deserializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.Objects }; | |
this.serializer.Converters.Add(new StringEnumConverter()); | |
InitializeTypeMappings(null); | |
} | |
private void InitializeTypeMappings(IEnumerable<KeyValuePair<string, Type>> eventTypeToTypeMappings) | |
{ | |
var mappings = eventTypeToTypeMappings ?? new Dictionary<string, Type>(); | |
this.eventTypeToType = mappings.ToDictionary(x => x.Key, x => x.Value); | |
this.clrTypeToEventStoreType = eventTypeToType.ToDictionary(x => x.Value, x => x.Key); | |
} | |
private string MapToEventType(object e) | |
{ | |
// Use mappings if provided | |
if (clrTypeToEventStoreType != null && clrTypeToEventStoreType.Count > 0) | |
{ | |
if (!clrTypeToEventStoreType.ContainsKey(e.GetType())) | |
{ | |
throw new ApplicationException(String.Format("Serializer is supposed to know about type {0}, but does not.", e.GetType().FullName)); | |
} | |
return clrTypeToEventStoreType[e.GetType()]; | |
} | |
return e.GetType().Name; | |
} | |
private Type MapFromEventType(string eventType) | |
{ | |
if (clrTypeToEventStoreType == null || clrTypeToEventStoreType.Count <= 0) return null; | |
// Use mappings if provided, ignore if one doesn't exist | |
return this.eventTypeToType.ContainsKey(eventType) ? | |
this.eventTypeToType[eventType] : | |
null; | |
} | |
public byte[] SerializeToBytes(object e) | |
{ | |
var ms = new MemoryStream(); | |
using (var w = new StreamWriter(ms)) | |
{ | |
this.serializer.Serialize(w, e); | |
} | |
return ms.ToArray(); | |
} | |
/// <summary> | |
/// A convience specialization of Deserialize(String,byte[]), perhaps should | |
/// be an extension method, but it satisfies the interface | |
/// </summary> | |
/// <param name="resolvedEvent"></param> | |
/// <returns></returns> | |
public IEnumerable<object> Deserialize(ResolvedEvent resolvedEvent) | |
{ | |
return Deserialize(resolvedEvent.Event.EventType, resolvedEvent.Event.Data); | |
} | |
public IEnumerable<object> Deserialize(String typename, byte[] data) | |
{ | |
var type = MapFromEventType(typename); | |
if (type != null) | |
yield return Deserialize(type, data); | |
else | |
yield return Deserialize(data); | |
} | |
private object Deserialize(Type type, byte[] data) | |
{ | |
if (type != null) | |
{ | |
using (var sr = new StreamReader(new MemoryStream(data))) | |
{ | |
return this.deserializer.Deserialize(sr, type); | |
} | |
} | |
throw new ArgumentNullException("passed in null for the type, which should never be."); | |
} | |
private object Deserialize(byte[] data) | |
{ | |
using (var sr = new StreamReader(new MemoryStream(data))) | |
{ | |
return this.deserializer.Deserialize(new JsonTextReader(sr)); | |
} | |
} | |
public T Deserialize<T>(byte[] jsonbytes) | |
{ | |
using (var sr = new StreamReader(new MemoryStream(jsonbytes))) | |
{ | |
return (T)deserializer.Deserialize(sr, typeof(T)); | |
} | |
} | |
public T Deserialize<T>(String jsonString) | |
{ | |
return Deserialize<T>(Encoding.UTF8.GetBytes(jsonString)); | |
} | |
/// <summary> | |
/// Helper function for creating a name to type, for use on the constructor | |
/// </summary> | |
/// <param name="types"></param> | |
/// <returns></returns> | |
public static IDictionary<String, Type> DefaultNameToType(IEnumerable<Type> types) | |
{ | |
return types.ToDictionary(t => t.Name, t=>t); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment