Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save oguzhaneren/05a15e960fff3dea794b62dc803ba464 to your computer and use it in GitHub Desktop.
Save oguzhaneren/05a15e960fff3dea794b62dc803ba464 to your computer and use it in GitHub Desktop.
Elasticsearch - Search all with parallel scroll as stream extension (AsyncEnumerable)
public static async IAsyncEnumerable<T> SearchAllWithParallelScrollAsStream<T>(this IElasticClient elasticClient,
Func<SearchDescriptor<T>, ISearchRequest> query,
int shardSize = 2,
Expression<Func<T, object>> routingPath = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default,
string scrollTimeout = "2m", int scrollSize = 1000) where T : class
{
shardSize = Math.Max(shardSize, 1);
var scrolls = new ConcurrentQueue<string>();
var stream = elasticClient.ScrollAll<T>(scrollTimeout, shardSize, s => s
.RoutingField(routingPath)
.Search(search =>
{
var descriptor = query(search);
descriptor.Size = scrollSize;
return descriptor;
}
))
.ToAsyncEnumerable();
try
{
await foreach (var scrollResponse in stream.WithCancellation(cancellationToken))
{
scrolls.Enqueue(scrollResponse.SearchResponse.ScrollId);
foreach (var hit in scrollResponse.SearchResponse.Hits)
{
yield return hit.Source;
}
}
}
finally
{
try
{
var scrollIds = scrolls.ToArray();
await elasticClient.ClearScrollAsync(x => x.ScrollId(scrollIds), cancellationToken);
}
catch
{
// ignore
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment