Skip to content

Instantly share code, notes, and snippets.

@rikbosch
Created January 7, 2019 13:35
Show Gist options
  • Save rikbosch/6d9c894edf2a45018c7160d861c34e2f to your computer and use it in GitHub Desktop.
Save rikbosch/6d9c894edf2a45018c7160d861c34e2f to your computer and use it in GitHub Desktop.
StreamStoreWithPolicyWrapper.cs
public class StreamStoreWithPolicyWrapper : IStreamStore
{
private readonly IStreamStore _inner;
private readonly IAsyncPolicy _policy;
private readonly string _contextPrefix;
public StreamStoreWithPolicyWrapper(IOptions<StreamStoreWithPolicyWrapperSettings> config, IStreamStore inner, IReadOnlyPolicyRegistry<string> policyRegistry)
{
_inner = inner;
_contextPrefix = inner.GetType().Name;
_policy = policyRegistry.TryGet<IAsyncPolicy>(config.Value.PolicyKey, out var policy) ? policy : Policy.NoOpAsync();
}
public Task<ReadAllPage> ReadAllForwards(long fromPositionInclusive, int maxCount, bool prefetchJsonData = true,
CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync((context, ct) =>
_inner.ReadAllForwards(fromPositionInclusive, maxCount, prefetchJsonData, ct),
new Context($"{_contextPrefix}.{nameof(ReadAllForwards)}"), cancellationToken);
}
public Task<ReadAllPage> ReadAllBackwards(long fromPositionInclusive, int maxCount, bool prefetchJsonData = true,
CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.ReadAllBackwards(fromPositionInclusive, maxCount, prefetchJsonData, ct),
new Context($"{_contextPrefix}.{nameof(ReadAllBackwards)}"), cancellationToken);
}
public Task<ReadStreamPage> ReadStreamForwards(StreamId streamId, int fromVersionInclusive, int maxCount, bool prefetchJsonData = true,
CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) =>
_inner.ReadStreamForwards(streamId, fromVersionInclusive, maxCount, prefetchJsonData, ct),
new Context($"{_contextPrefix}.{nameof(ReadStreamForwards)}"), cancellationToken);
}
public Task<ReadStreamPage> ReadStreamBackwards(StreamId streamId, int fromVersionInclusive, int maxCount, bool prefetchJsonData = true,
CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.ReadStreamBackwards(streamId, fromVersionInclusive, maxCount, prefetchJsonData, ct),
new Context($"{_contextPrefix}.{nameof(ReadStreamBackwards)}"), cancellationToken);
}
public IStreamSubscription SubscribeToStream(StreamId streamId, int? continueAfterVersion,
StreamMessageReceived streamMessageReceived, SubscriptionDropped subscriptionDropped = null,
HasCaughtUp hasCaughtUp = null, bool prefetchJsonData = true, string name = null)
{
return _inner.SubscribeToStream(streamId, continueAfterVersion, streamMessageReceived, subscriptionDropped, hasCaughtUp, prefetchJsonData, name);
}
public IAllStreamSubscription SubscribeToAll(long? continueAfterPosition, AllStreamMessageReceived streamMessageReceived,
AllSubscriptionDropped subscriptionDropped = null, HasCaughtUp hasCaughtUp = null, bool prefetchJsonData = true,
string name = null)
{
return _inner.SubscribeToAll(continueAfterPosition, streamMessageReceived, subscriptionDropped, hasCaughtUp, prefetchJsonData, name);
}
public Task<long> ReadHeadPosition(CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.ReadHeadPosition(ct),
new Context($"{_contextPrefix}.{nameof(ReadHeadPosition)}"), cancellationToken);
}
public Task<StreamMetadataResult> GetStreamMetadata(string streamId, CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.GetStreamMetadata(streamId, ct),
new Context($"{_contextPrefix}.{nameof(GetStreamMetadata)}"), cancellationToken);
}
public event Action OnDispose
{
add => _inner.OnDispose += value;
remove => _inner.OnDispose -= value;
}
public Task<AppendResult> AppendToStream(StreamId streamId, int expectedVersion, NewStreamMessage[] messages,
CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.AppendToStream(streamId, expectedVersion, messages, ct),
new Context($"{_contextPrefix}.{nameof(AppendToStream)}"), cancellationToken);
}
public Task DeleteStream(StreamId streamId, int expectedVersion = -2,
CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.DeleteStream(streamId, expectedVersion, ct),
new Context($"{_contextPrefix}.{nameof(DeleteStream)}"), cancellationToken);
}
public Task DeleteMessage(StreamId streamId, Guid messageId, CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.DeleteMessage(streamId, messageId, ct),
new Context($"{_contextPrefix}.{nameof(DeleteMessage)}"), cancellationToken);
}
public Task SetStreamMetadata(StreamId streamId, int expectedStreamMetadataVersion = -2, int? maxAge = null,
int? maxCount = null, string metadataJson = null, CancellationToken cancellationToken = new CancellationToken())
{
return _policy.ExecuteAsync(
(context, ct) => _inner.SetStreamMetadata(streamId, expectedStreamMetadataVersion, maxAge, maxCount, metadataJson, ct),
new Context($"{_contextPrefix}.{nameof(SetStreamMetadata)}"), cancellationToken);
}
public void Dispose()
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment