Last active
May 11, 2019 20:41
-
-
Save slavanap/3b9c436c2defae8b1d305ad30fdfe889 to your computer and use it in GitHub Desktop.
Populate cache from single thread.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
class UsageSample { | |
public DistributedCacheUpdater _cache; | |
async Task<JObject> GetResponse(HttpRequestMessage httpRequest, DistributedCacheEntryOptions cacheOptions, CancellationToken ct) { | |
using (var proxy = await DistributedCacheUpdater.RequestAsync(_cache, httpRequest.RequestUri.ToString(), ct)) { | |
string responseData; | |
if (proxy.Value == null) { | |
_metrics.Measure.Meter.Mark(_cacheHits, "nohit"); | |
using (var httpResponse = await _client.SendAsync(httpRequest, HttpCompletionOption.ResponseContentRead, ct)) { | |
httpResponse.EnsureSuccessStatusCode(); | |
responseData = await httpResponse.Content.ReadAsStringAsync(); | |
} | |
await proxy.Update(Encoding.UTF8.GetBytes(responseData), cacheOptions, ct); | |
} | |
else { | |
_metrics.Measure.Meter.Mark(_cacheHits, "hit"); | |
responseData = Encoding.UTF8.GetString(proxy.Value); | |
} | |
return JObject.Parse(responseData); | |
} | |
} | |
// ... | |
} | |
*/ | |
using Microsoft.Extensions.Caching.Distributed; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
// MIT license. Vyacheslav Napadovsky | |
namespace CoreApp.Extensions { | |
public class DistributedCacheUpdater { | |
public class ValueProxy: IDisposable { | |
private readonly DistributedCacheUpdater _helper; | |
private TaskCompletionSource<byte[]> _notifyCreated; | |
public readonly string Key; | |
public byte[] Value { get; private set; } | |
public ValueProxy(string key) { | |
_helper = null; | |
_notifyCreated = null; | |
Key = key; | |
Value = null; | |
} | |
internal ValueProxy(DistributedCacheUpdater helper, string key, byte[] value) { | |
_helper = helper; | |
_notifyCreated = null; | |
Key = key; | |
Value = value; | |
} | |
internal ValueProxy(DistributedCacheUpdater helper, string key, TaskCompletionSource<byte[]> notifyCreated) { | |
_helper = helper; | |
_notifyCreated = notifyCreated; | |
Key = key; | |
Value = null; | |
} | |
private void Notify(byte[] newValue) { | |
if (_notifyCreated != null) { | |
TaskCompletionSource<byte[]> notifyCreated; | |
if (!_helper._pendingUpdates.TryRemove(Key, out notifyCreated)) | |
throw new InvalidOperationException("Key should exist in the dictionary"); | |
notifyCreated.SetResult(newValue); | |
if (notifyCreated != _notifyCreated) | |
throw new InvalidOperationException("Expected correct notifyCreated value"); | |
_notifyCreated = null; | |
} | |
} | |
public async Task Update(byte[] value, DistributedCacheEntryOptions options, CancellationToken ct) { | |
if (_helper != null) { | |
await _helper._cache.SetAsync(Key, value, options, ct); | |
Value = value; | |
Notify(value); | |
} | |
} | |
public void Dispose() { | |
if (_helper != null) | |
Notify(null); | |
} | |
} | |
readonly IDistributedCache _cache; | |
readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _pendingUpdates = | |
new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>(); | |
public DistributedCacheUpdater(IDistributedCache cache) { | |
_cache = cache; | |
} | |
public async Task<ValueProxy> RequestAsync(string key, CancellationToken ct) { | |
while (true) { | |
var result = await _cache.GetAsync(key, ct); | |
if (result != null) | |
return new ValueProxy(this, key, result); | |
var newTaskCompletionSource = new TaskCompletionSource<byte[]>(); | |
var notifyCreated = _pendingUpdates.GetOrAdd(key, newTaskCompletionSource); | |
if (notifyCreated == newTaskCompletionSource) // == if added | |
return new ValueProxy(this, key, notifyCreated); | |
result = await notifyCreated.Task.ConfigureAwait(false); | |
if (result != null) | |
return new ValueProxy(this, key, result); | |
} | |
} | |
public static Task<ValueProxy> RequestAsync(DistributedCacheUpdater updater, string key, CancellationToken ct) { | |
if (updater != null) | |
return updater.RequestAsync(key, ct); | |
return Task.FromResult(new ValueProxy(key)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment