Skip to content

Instantly share code, notes, and snippets.

@BashkaMen
Created September 13, 2021 11:08
Show Gist options
  • Save BashkaMen/0f8a819e8d558df65da1785ed7637d31 to your computer and use it in GitHub Desktop.
Save BashkaMen/0f8a819e8d558df65da1785ed7637d31 to your computer and use it in GitHub Desktop.
public static class Ext
{
public static IObservable<string> AnonymousConsume(this IDatabase db, string topic, int prefetchCount = 100, CancellationToken? cancellationToken = null)
{
async Task<(string[], string lastPos)> SafeRead(string position)
{
try
{
var msgs = await db.StreamReadAsync(topic, position, prefetchCount);
var items = msgs.SelectMany(s => s.Values)
.Select(s => s.Value.ToString())
.ToArray();
var lastPos = msgs.LastOrDefault().Id;
return (items, lastPos);
}
catch (Exception e)
{
// log here
return (Array.Empty<string>(), position);
}
}
return Observable.Create(async (IObserver<string> x) =>
{
var cts = cancellationToken ?? CancellationToken.None;
var pos = "0-0";
while (!cts.IsCancellationRequested)
{
var (items, newPos) = await SafeRead(pos);
foreach (var item in items)
{
x.OnNext(item);
}
pos = newPos;
}
});
}
public static void Example(this IDatabase db)
{
var state = new Dictionary<string, dynamic[]>();
db.AnonymousConsume("topic-name")
.Synchronize(new object())
.Select(value => JsonConvert.DeserializeObject<dynamic>(value)) // deserialize to model not dynamic :)
.Subscribe(item =>
{
var gameId = item.GameId;
var blocks = item.Blocks;
state[gameId] = blocks;
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment