Last active
December 14, 2015 22:49
-
-
Save damianh/5161024 to your computer and use it in GitHub Desktop.
Alternative InMemory Eventstore persistence engine that does not contain statics and better performing. For the log statements, supply your own resources or remove them.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| Usage: | |
| _storeEvents = Wireup | |
| .Init() | |
| .UsingFastInMemoryPersistence() | |
| .InitializeStorageEngine() | |
| .etc... | |
| */ | |
| // ReSharper disable CheckNamespace | |
| namespace EventStore.Persistence | |
| // ReSharper restore CheckNamespace | |
| { | |
| using System; | |
| using System.Collections.Generic; | |
| using System.Diagnostics.CodeAnalysis; | |
| using System.Linq; | |
| using global::EventStore.Logging; | |
| [SuppressMessage("Microsoft.Naming", "CA1704:IdentifiersShouldBeSpelledCorrectly", MessageId = "Wireup")] | |
| public static class PersistenceWireupExtensions | |
| { | |
| [SuppressMessage("Microsoft.Naming", "CA1704:IdentifiersShouldBeSpelledCorrectly", MessageId = "wireup")] | |
| [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] | |
| public static PersistenceWireup UsingFastInMemoryPersistence(this Wireup wireup) | |
| { | |
| wireup.With<IPersistStreams>(new FastInMemoryPersistenceEngine()); | |
| return new PersistenceWireup(wireup); | |
| } | |
| } | |
| public class FastInMemoryPersistenceEngine : IPersistStreams | |
| { | |
| private static readonly ILog Logger = LogFactory.BuildLogger(typeof(FastInMemoryPersistenceEngine)); | |
| private readonly CommitCollection _commits = new CommitCollection(); | |
| private readonly StreamHeadCollection _heads = new StreamHeadCollection(); | |
| private readonly ICollection<Snapshot> _snapshots = new LinkedList<Snapshot>(); | |
| private bool _disposed; | |
| public virtual bool AddSnapshot(Snapshot snapshot) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.AddingSnapshot, snapshot.StreamId, snapshot.StreamRevision); | |
| lock (_commits) | |
| { | |
| StreamHead currentHead = _heads.Get(snapshot.StreamId); | |
| if (currentHead == null) | |
| { | |
| return false; | |
| } | |
| _snapshots.Add(snapshot); | |
| _heads.Remove(currentHead); | |
| _heads.Add(new StreamHead(currentHead.StreamId, currentHead.HeadRevision, snapshot.StreamRevision)); | |
| } | |
| return true; | |
| } | |
| public virtual void Commit(Commit attempt) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.AttemptingToCommit, attempt.CommitId, attempt.StreamId, attempt.CommitSequence); | |
| lock (_commits) | |
| { | |
| if (_commits.ContainsCommit(attempt.CommitId)) | |
| { | |
| throw new DuplicateCommitException(); | |
| } | |
| if (_commits.ContainsRevision(attempt.StreamId, attempt.StreamRevision)) | |
| { | |
| throw new ConcurrencyException(); | |
| } | |
| _commits.Add(attempt); | |
| StreamHead head = _heads.Get(attempt.StreamId); | |
| if (head != null) | |
| { | |
| _heads.Remove(head); | |
| } | |
| Logger.Debug(Resources.UpdatingStreamHead, attempt.StreamId); | |
| int snapshotRevision = head == null ? 0 : head.SnapshotRevision; | |
| _heads.Add(new StreamHead(attempt.StreamId, attempt.StreamRevision, snapshotRevision)); | |
| } | |
| } | |
| public virtual IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int maxRevision) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.GettingAllCommitsFromRevision, streamId, minRevision, maxRevision); | |
| lock (_commits) | |
| { | |
| return _commits.GetFrom(streamId, minRevision, maxRevision); | |
| } | |
| } | |
| public virtual IEnumerable<Commit> GetFrom(DateTime start) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.GettingAllCommitsFromTime, start); | |
| return _commits.GetFrom(start); | |
| } | |
| public virtual Snapshot GetSnapshot(Guid streamId, int maxRevision) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.GettingSnapshotForStream, streamId, maxRevision); | |
| lock (_commits) | |
| { | |
| return _snapshots | |
| .Where(x => x.StreamId == streamId && x.StreamRevision <= maxRevision) | |
| .OrderByDescending(x => x.StreamRevision) | |
| .FirstOrDefault(); | |
| } | |
| } | |
| public virtual IEnumerable<StreamHead> GetStreamsToSnapshot(int maxThreshold) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.GettingStreamsToSnapshot, maxThreshold); | |
| lock (_commits) | |
| { | |
| return _heads.GetStreamsToSnapshot(maxThreshold); | |
| } | |
| } | |
| public virtual IEnumerable<Commit> GetUndispatchedCommits() | |
| { | |
| lock (_commits) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.RetrievingUndispatchedCommits, _commits.Count); | |
| return _commits.GetUndispatchedCommits(); | |
| } | |
| } | |
| public virtual void MarkCommitAsDispatched(Commit commit) | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Debug(Resources.MarkingAsDispatched, commit.CommitId); | |
| lock (_commits) | |
| { | |
| _commits.RemoveUndispatched(commit); | |
| } | |
| } | |
| public virtual void Purge() | |
| { | |
| ThrowWhenDisposed(); | |
| Logger.Warn(Resources.PurgingStore); | |
| lock (_commits) | |
| { | |
| _commits.Clear(); | |
| _snapshots.Clear(); | |
| _heads.Clear(); | |
| } | |
| } | |
| public void Dispose() | |
| { | |
| Dispose(true); | |
| GC.SuppressFinalize(this); | |
| } | |
| public void Initialize() | |
| { | |
| Logger.Info(Resources.InitializingEngine); | |
| } | |
| protected virtual void Dispose(bool disposing) | |
| { | |
| _disposed = true; | |
| Logger.Info(Resources.DisposingEngine); | |
| } | |
| private void ThrowWhenDisposed() | |
| { | |
| if (!_disposed) | |
| { | |
| return; | |
| } | |
| Logger.Warn(Resources.AlreadyDisposed); | |
| throw new ObjectDisposedException(Resources.AlreadyDisposed); | |
| } | |
| private class CommitCollection | |
| { | |
| private readonly List<Commit> _commits = new List<Commit>(); | |
| private readonly Dictionary<Guid, Commit> _commitsById = new Dictionary<Guid, Commit>(); | |
| private readonly Dictionary<Guid, Dictionary<int, Commit>> _commitsByStreamByRevision = new Dictionary<Guid, Dictionary<int, Commit>>(); | |
| private readonly Dictionary<Guid, DateTime> _stamps = new Dictionary<Guid, DateTime>(); | |
| private readonly ICollection<Commit> _undispatched = new LinkedList<Commit>(); | |
| public int Count | |
| { | |
| get { return _commits.Count; } | |
| } | |
| public void Add(Commit attempt) | |
| { | |
| _stamps[attempt.CommitId] = attempt.CommitStamp; | |
| _commits.Add(attempt); | |
| _commitsById.Add(attempt.CommitId, attempt); | |
| if (!_commitsByStreamByRevision.ContainsKey(attempt.StreamId)) | |
| { | |
| _commitsByStreamByRevision.Add(attempt.StreamId, new Dictionary<int, Commit>()); | |
| } | |
| _commitsByStreamByRevision[attempt.StreamId].Add(attempt.StreamRevision, attempt); | |
| _undispatched.Add(attempt); | |
| } | |
| public IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int maxRevision) | |
| { | |
| return | |
| _commits.Where(x => x.StreamId == streamId && x.StreamRevision >= minRevision && (x.StreamRevision - x.Events.Count + 1) <= maxRevision) | |
| .ToArray(); | |
| } | |
| public IEnumerable<Commit> GetFrom(DateTime start) | |
| { | |
| Guid commitId = _stamps.Where(x => x.Value >= start).Select(x => x.Key).FirstOrDefault(); | |
| if (commitId == Guid.Empty) | |
| { | |
| return new Commit[] { }; | |
| } | |
| Commit startingCommit = _commitsById[commitId]; | |
| return _commits.Skip(_commits.IndexOf(startingCommit)); | |
| } | |
| public IEnumerable<Commit> GetUndispatchedCommits() | |
| { | |
| return _commits.Where(c => _undispatched.Contains(c)).ToArray(); | |
| } | |
| public void RemoveUndispatched(Commit commit) | |
| { | |
| _undispatched.Remove(commit); | |
| } | |
| internal void Clear() | |
| { | |
| _commits.Clear(); | |
| _commitsById.Clear(); | |
| _commitsByStreamByRevision.Clear(); | |
| } | |
| internal bool ContainsCommit(Guid commitId) | |
| { | |
| return _commitsById.ContainsKey(commitId); | |
| } | |
| internal bool ContainsRevision(Guid streamId, int streamRevision) | |
| { | |
| return _commitsByStreamByRevision.ContainsKey(streamId) && _commitsByStreamByRevision[streamId].ContainsKey(streamRevision); | |
| } | |
| } | |
| private class StreamHeadCollection | |
| { | |
| private readonly List<StreamHead> _heads = new List<StreamHead>(); | |
| private readonly Dictionary<Guid, StreamHead> _headsByStreamId = new Dictionary<Guid, StreamHead>(); | |
| public IEnumerable<StreamHead> GetStreamsToSnapshot(int maxThreshold) | |
| { | |
| return _heads.Where(x => x.HeadRevision >= x.SnapshotRevision + maxThreshold) | |
| .Select(stream => new StreamHead(stream.StreamId, stream.HeadRevision, stream.SnapshotRevision)); | |
| } | |
| internal void Add(StreamHead streamHead) | |
| { | |
| _headsByStreamId.Add(streamHead.StreamId, streamHead); | |
| _heads.Add(streamHead); | |
| } | |
| internal void Clear() | |
| { | |
| _heads.Clear(); | |
| _headsByStreamId.Clear(); | |
| } | |
| internal StreamHead Get(Guid streamId) | |
| { | |
| return _headsByStreamId.ContainsKey(streamId) ? _headsByStreamId[streamId] : null; | |
| } | |
| internal void Remove(StreamHead streamHead) | |
| { | |
| _heads.Remove(streamHead); | |
| _headsByStreamId.Remove(streamHead.StreamId); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment