Created
October 9, 2019 11:32
-
-
Save ptupitsyn/61ec797fb3f44cafc25c22332c61c19c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Apache.Ignite.Core.Cache; | |
using Apache.Ignite.Core.Cache.Event; | |
using Apache.Ignite.Core.Cache.Query.Continuous; | |
namespace IgniteAsyncStreams | |
{ | |
public static class IgniteAsyncStreamExtensions | |
{ | |
public static IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync<TK, TV>( | |
this ICache<TK, TV> cache) | |
{ | |
return new AsyncContinuousQueryEnumerable<TK, TV>(cache); | |
} | |
public static async IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync2<TK, TV>( | |
this ICache<TK, TV> cache) | |
{ | |
var queryListener = new AsyncContinuousQueryListener<TK, TV>(); | |
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener); | |
using (cache.QueryContinuous(continuousQuery)) | |
{ | |
while (true) | |
{ | |
while (queryListener.Events.TryDequeue(out var entryEvent)) | |
{ | |
yield return entryEvent; | |
} | |
await queryListener.HasData.WaitAsync(); | |
} | |
} | |
} | |
private class AsyncContinuousQueryEnumerable<TK, TV> : IAsyncEnumerable<ICacheEntry<TK, TV>> | |
{ | |
private readonly ICache<TK, TV> _cache; | |
public AsyncContinuousQueryEnumerable(ICache<TK, TV> cache) | |
{ | |
_cache = cache; | |
} | |
public IAsyncEnumerator<ICacheEntry<TK, TV>> GetAsyncEnumerator( | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
var queryListener = new AsyncContinuousQueryListener<TK, TV>(); | |
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener); | |
var queryHandle = _cache.QueryContinuous(continuousQuery); | |
return new AsyncContinuousQueryEnumerator<TK, TV>(queryListener, queryHandle); | |
} | |
} | |
private class AsyncContinuousQueryEnumerator<TK, TV> : IAsyncEnumerator<ICacheEntryEvent<TK, TV>> | |
{ | |
private readonly AsyncContinuousQueryListener<TK, TV> _queryListener; | |
private readonly IContinuousQueryHandle _queryHandle; | |
public AsyncContinuousQueryEnumerator(AsyncContinuousQueryListener<TK, TV> queryListener, | |
IContinuousQueryHandle queryHandle) | |
{ | |
_queryListener = queryListener; | |
_queryHandle = queryHandle; | |
} | |
public ValueTask DisposeAsync() | |
{ | |
_queryHandle.Dispose(); | |
return default; | |
} | |
public async ValueTask<bool> MoveNextAsync() | |
{ | |
ICacheEntryEvent<TK, TV>? entryEvent; | |
while (!_queryListener.Events.TryDequeue(out entryEvent)) | |
{ | |
await _queryListener.HasData.WaitAsync(); | |
} | |
Current = entryEvent; | |
return true; | |
} | |
#nullable disable | |
public ICacheEntryEvent<TK, TV> Current { get; private set; } | |
#nullable restore | |
} | |
private class AsyncContinuousQueryListener<TK, TV> : ICacheEntryEventListener<TK, TV> | |
{ | |
public readonly SemaphoreSlim HasData = new SemaphoreSlim(0, 1); | |
public readonly ConcurrentQueue<ICacheEntryEvent<TK, TV>> Events | |
= new ConcurrentQueue<ICacheEntryEvent<TK, TV>>(); | |
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events) | |
{ | |
foreach (var entryEvent in events) | |
{ | |
Console.WriteLine("Received entry: " + entryEvent.Value); | |
Events.Enqueue(entryEvent); | |
} | |
HasData.Release(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment