Created
January 7, 2019 13:35
-
-
Save rikbosch/6d9c894edf2a45018c7160d861c34e2f to your computer and use it in GitHub Desktop.
StreamStoreWithPolicyWrapper.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class StreamStoreWithPolicyWrapper : IStreamStore | |
{ | |
private readonly IStreamStore _inner; | |
private readonly IAsyncPolicy _policy; | |
private readonly string _contextPrefix; | |
public StreamStoreWithPolicyWrapper(IOptions<StreamStoreWithPolicyWrapperSettings> config, IStreamStore inner, IReadOnlyPolicyRegistry<string> policyRegistry) | |
{ | |
_inner = inner; | |
_contextPrefix = inner.GetType().Name; | |
_policy = policyRegistry.TryGet<IAsyncPolicy>(config.Value.PolicyKey, out var policy) ? policy : Policy.NoOpAsync(); | |
} | |
public Task<ReadAllPage> ReadAllForwards(long fromPositionInclusive, int maxCount, bool prefetchJsonData = true, | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync((context, ct) => | |
_inner.ReadAllForwards(fromPositionInclusive, maxCount, prefetchJsonData, ct), | |
new Context($"{_contextPrefix}.{nameof(ReadAllForwards)}"), cancellationToken); | |
} | |
public Task<ReadAllPage> ReadAllBackwards(long fromPositionInclusive, int maxCount, bool prefetchJsonData = true, | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.ReadAllBackwards(fromPositionInclusive, maxCount, prefetchJsonData, ct), | |
new Context($"{_contextPrefix}.{nameof(ReadAllBackwards)}"), cancellationToken); | |
} | |
public Task<ReadStreamPage> ReadStreamForwards(StreamId streamId, int fromVersionInclusive, int maxCount, bool prefetchJsonData = true, | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => | |
_inner.ReadStreamForwards(streamId, fromVersionInclusive, maxCount, prefetchJsonData, ct), | |
new Context($"{_contextPrefix}.{nameof(ReadStreamForwards)}"), cancellationToken); | |
} | |
public Task<ReadStreamPage> ReadStreamBackwards(StreamId streamId, int fromVersionInclusive, int maxCount, bool prefetchJsonData = true, | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.ReadStreamBackwards(streamId, fromVersionInclusive, maxCount, prefetchJsonData, ct), | |
new Context($"{_contextPrefix}.{nameof(ReadStreamBackwards)}"), cancellationToken); | |
} | |
public IStreamSubscription SubscribeToStream(StreamId streamId, int? continueAfterVersion, | |
StreamMessageReceived streamMessageReceived, SubscriptionDropped subscriptionDropped = null, | |
HasCaughtUp hasCaughtUp = null, bool prefetchJsonData = true, string name = null) | |
{ | |
return _inner.SubscribeToStream(streamId, continueAfterVersion, streamMessageReceived, subscriptionDropped, hasCaughtUp, prefetchJsonData, name); | |
} | |
public IAllStreamSubscription SubscribeToAll(long? continueAfterPosition, AllStreamMessageReceived streamMessageReceived, | |
AllSubscriptionDropped subscriptionDropped = null, HasCaughtUp hasCaughtUp = null, bool prefetchJsonData = true, | |
string name = null) | |
{ | |
return _inner.SubscribeToAll(continueAfterPosition, streamMessageReceived, subscriptionDropped, hasCaughtUp, prefetchJsonData, name); | |
} | |
public Task<long> ReadHeadPosition(CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.ReadHeadPosition(ct), | |
new Context($"{_contextPrefix}.{nameof(ReadHeadPosition)}"), cancellationToken); | |
} | |
public Task<StreamMetadataResult> GetStreamMetadata(string streamId, CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.GetStreamMetadata(streamId, ct), | |
new Context($"{_contextPrefix}.{nameof(GetStreamMetadata)}"), cancellationToken); | |
} | |
public event Action OnDispose | |
{ | |
add => _inner.OnDispose += value; | |
remove => _inner.OnDispose -= value; | |
} | |
public Task<AppendResult> AppendToStream(StreamId streamId, int expectedVersion, NewStreamMessage[] messages, | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.AppendToStream(streamId, expectedVersion, messages, ct), | |
new Context($"{_contextPrefix}.{nameof(AppendToStream)}"), cancellationToken); | |
} | |
public Task DeleteStream(StreamId streamId, int expectedVersion = -2, | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.DeleteStream(streamId, expectedVersion, ct), | |
new Context($"{_contextPrefix}.{nameof(DeleteStream)}"), cancellationToken); | |
} | |
public Task DeleteMessage(StreamId streamId, Guid messageId, CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.DeleteMessage(streamId, messageId, ct), | |
new Context($"{_contextPrefix}.{nameof(DeleteMessage)}"), cancellationToken); | |
} | |
public Task SetStreamMetadata(StreamId streamId, int expectedStreamMetadataVersion = -2, int? maxAge = null, | |
int? maxCount = null, string metadataJson = null, CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return _policy.ExecuteAsync( | |
(context, ct) => _inner.SetStreamMetadata(streamId, expectedStreamMetadataVersion, maxAge, maxCount, metadataJson, ct), | |
new Context($"{_contextPrefix}.{nameof(SetStreamMetadata)}"), cancellationToken); | |
} | |
public void Dispose() | |
{ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment