Skip to content

Instantly share code, notes, and snippets.

Created December 27, 2012 15:33
Show Gist options
  • Save fitzchak/4389082 to your computer and use it in GitHub Desktop.
Save fitzchak/4389082 to your computer and use it in GitHub Desktop.
// -----------------------------------------------------------------------
// <copyright file="PrefetchingBehavior.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Raven.Abstractions;
using Raven.Abstractions.Data;
using Raven.Abstractions.Logging;
using Raven.Abstractions.Util;
using Raven.Database.Config;
using Raven.Database.Impl;
namespace Raven.Database.Indexing
public class PrefetchingBehavior : IDisposable
private static readonly ILog log = LogManager.GetCurrentClassLogger();
private readonly BaseBatchSizeAutoTuner autoTuner;
private readonly WorkContext context;
private readonly ConcurrentDictionary<string, HashSet<Guid>> documentsToRemove =
new ConcurrentDictionary<string, HashSet<Guid>>(StringComparer.InvariantCultureIgnoreCase);
private readonly ConcurrentDictionary<Guid, FutureIndexBatch> futureIndexBatches =
new ConcurrentDictionary<Guid, FutureIndexBatch>();
private readonly ConcurrentDictionary<Guid, InMemoryIndexBatch> inMemoryIndexBatches =
new ConcurrentDictionary<Guid, InMemoryIndexBatch>();
private int currentIndexingAge;
public PrefetchingBehavior(WorkContext context, BaseBatchSizeAutoTuner autoTuner)
this.context = context;
this.autoTuner = autoTuner;
#region IDisposable Members
public void Dispose()
public List<JsonDocument> GetDocumentsBatchFrom(Guid etag)
var results =
GetInMemoryJsonDocuments(etag) ??
GetFutureJsonDocuments(etag) ??
var jsonDocuments = MergeWithOtherMemoryOrFutureResults(results);
return jsonDocuments;
private List<JsonDocument> GetInMemoryJsonDocuments(Guid etag)
return null;
if (context.Configuration.DisableDocumentPreFetchingForIndexing)
return null;
var nextDocEtag = GetNextDocEtag(etag);
InMemoryIndexBatch nextBatch;
if (inMemoryIndexBatches.TryRemove(nextDocEtag, out nextBatch) == false)
return null;
lock (nextBatch)
// make sure that if there are any concurrent updates, it will wait for them
return nextBatch.Documents;
private List<JsonDocument> GetFutureJsonDocuments(Guid etag)
return null;
if (context.Configuration.DisableDocumentPreFetchingForIndexing)
return null;
Guid nextDocEtag = GetNextDocEtag(etag);
FutureIndexBatch nextBatch;
if (futureIndexBatches.TryRemove(nextDocEtag, out nextBatch) == false)
return null;
if (Task.CurrentId == nextBatch.Task.Id)
return null;
return nextBatch.Task.Result;
catch (Exception e)
log.WarnException("Error when getting next batch value asynchronously, will try in sync manner", e);
return null;
private List<JsonDocument> GetJsonDocsFromDisk(Guid etag)
Guid? untilEtag = GetNextEtagInMemory(etag);
List<JsonDocument> jsonDocs = null;
context.TransactionaStorage.Batch(actions =>
jsonDocs = actions.Documents
.Where(x => x != null)
.Select(doc =>
return doc;
if(untilEtag == null)
return jsonDocs;
private Guid? GetNextEtagInMemory(Guid etag)
var current = new ComparableByteArray(etag);
var guid = inMemoryIndexBatches.Keys.Concat(futureIndexBatches.Keys)
.Where(x => current.CompareTo(x) < 0)
.OrderBy(x => x, ByteArrayComparer.Instance)
if (guid == Guid.Empty)
return null;
return guid;
private List<JsonDocument> MergeWithOtherMemoryOrFutureResults(List<JsonDocument> results)
if (results == null || results.Count == 0)
return results;
var nextDocEtag = GetNextDocEtag(GetNextHighestEtag(results));
while (true)
InMemoryIndexBatch value;
if(inMemoryIndexBatches.TryRemove(nextDocEtag, out value))
// make sure that there an no current modifications
nextDocEtag = GetNextDocEtag(value.Documents.Last().Etag.Value);
FutureIndexBatch nextBatch;
if (futureIndexBatches.TryRemove(nextDocEtag, out nextBatch))
lock (nextBatch)
// make sure that there an no current modifications
if (nextBatch.Task.IsCompleted == false)
lock (nextBatch)
// make sure that there an no current modifications
nextDocEtag = GetNextDocEtag(nextBatch.Task.Result.Last().Etag.Value);
// a single doc may appear multiple times, if it was updated
// while we were fetching things, so we have several versions
// of the same doc loaded, this will make sure that we will only
// take one of them.
return results
.GroupBy(x => x.Key)
.Select(g => g.OrderBy(x => x.Etag, ByteArrayComparer.Instance).Last())
private void MaybeAddFutureBatch(List<JsonDocument> past)
if (context.Configuration.DisableDocumentPreFetchingForIndexing || context.RunIndexing == false)
if (context.Configuration.MaxNumberOfParallelIndexTasks == 1)
if (past.Count == 0)
if (futureIndexBatches.Count > 5) // we limit the number of future calls we do
int alreadyLoaded = futureIndexBatches.Values.Sum(x =>
if (x.Task.IsCompleted)
return x.Task.Result.Count;
return 0;
if (alreadyLoaded > autoTuner.NumberOfItemsToIndexInSingleBatch)
// ensure we don't do TOO much future caching
if (MemoryStatistics.AvailableMemory <
// we loaded the maximum amount, there are probably more items to read now.
Guid highestLoadedEtag = GetNextHighestEtag(past);
Guid nextEtag = GetNextDocEtag(highestLoadedEtag);
if (nextEtag == highestLoadedEtag)
return; // there is nothing newer to do
if (futureIndexBatches.ContainsKey(nextEtag)) // already loading this
var futureBatchStat = new FutureBatchStats
Timestamp = SystemTime.UtcNow,
Stopwatch sp = Stopwatch.StartNew();
futureIndexBatches.TryAdd(nextEtag, new FutureIndexBatch
StartingEtag = nextEtag,
Age = Interlocked.Increment(ref currentIndexingAge),
Task = Task.Factory.StartNew(() =>
List<JsonDocument> jsonDocuments = null;
int localWork = 0;
while (context.RunIndexing)
jsonDocuments = GetJsonDocsFromDisk(nextEtag);
if (jsonDocuments.Count > 0)
context.WaitForWork(TimeSpan.FromMinutes(10), ref localWork, "PreFetching");
futureBatchStat.Duration = sp.Elapsed;
futureBatchStat.Size = jsonDocuments == null ? 0 : jsonDocuments.Count;
if (jsonDocuments != null)
return jsonDocuments;
private Guid GetNextDocEtag(Guid highestEtag)
var nextDocEtag = highestEtag;
var oneUpEtag = Etag.Increment(nextDocEtag, 1);
// no need to go to disk to find the next etag if we already have it in memory
if (inMemoryIndexBatches.ContainsKey(oneUpEtag) ||
return oneUpEtag;
accessor => { nextDocEtag = accessor.Documents.GetBestNextDocumentEtag(highestEtag); });
return nextDocEtag;
private static Guid GetNextHighestEtag(List<JsonDocument> past)
JsonDocument jsonDocument = GetHighestEtag(past);
if (jsonDocument == null)
return Guid.Empty;
return jsonDocument.Etag ?? Guid.Empty;
public static JsonDocument GetHighestEtag(List<JsonDocument> past)
var highest = new ComparableByteArray(Guid.Empty);
JsonDocument highestDoc = null;
for (int i = past.Count - 1; i >= 0; i--)
Guid etag = past[i].Etag.Value;
if (highest.CompareTo(etag) > 0)
highest = new ComparableByteArray(etag);
highestDoc = past[i];
return highestDoc;
private static Task ObserveDiscardedTask(FutureIndexBatch source)
return source.Task.ContinueWith(task =>
if (task.Exception != null)
log.WarnException("Error happened on discarded future work batch", task.Exception);
log.Warn("WASTE: Discarding future work item without using it, to reduce memory usage");
public void BatchProcessingComplete()
int indexingAge = Interlocked.Increment(ref currentIndexingAge);
// make sure that we don't have too much "future cache" items
const int numberOfIndexingGenerationsAllowed = 64;
foreach (FutureIndexBatch source in futureIndexBatches.Values.Where(x => (indexingAge - x.Age) > numberOfIndexingGenerationsAllowed).ToList())
FutureIndexBatch _;
futureIndexBatches.TryRemove(source.StartingEtag, out _);
// make sure that we don't have too much "in memory cache" items
foreach (InMemoryIndexBatch source in inMemoryIndexBatches.Values.Where(x => (indexingAge - x.Age) > numberOfIndexingGenerationsAllowed).ToList())
InMemoryIndexBatch _;
inMemoryIndexBatches.TryRemove(source.StartingEtag, out _);
public void GetFutureStats(int currentBatchSize, out int futureLen, out int futureSize)
futureLen = futureIndexBatches.Values.Sum(x =>
if (x.Task.IsCompleted)
return x.Task.Result.Count;
return currentBatchSize / 15;
futureSize = futureIndexBatches.Values.Sum(x =>
if (x.Task.IsCompleted)
List<JsonDocument> jsonResults = x.Task.Result;
return jsonResults.Sum(s => s.SerializedSizeOnDisk);
return currentBatchSize * 256;
public void AfterCommit(JsonDocument[] docs)
if (context.Configuration.DisableDocumentPreFetchingForIndexing || docs.Length == 0)
int countOfLoadedItems = inMemoryIndexBatches.Values.Sum(x => x.Documents.Count);
if (countOfLoadedItems > // don't use too much
foreach (JsonDocument doc in docs)
if (docs.Length == 1)
AddInMemoryBatch(docs, 0, 1);
int start = 0;
foreach (var end in SplitNonConsecutiveDocs(docs))
AddInMemoryBatch(docs, start, end);
start = end;
private void AddInMemoryBatch(JsonDocument[] docs, int start, int end)
var startingEtag = GetLowestEtag(docs, start, end);
var batchToAdd = inMemoryIndexBatches.Values.FirstOrDefault(x => x.EndEtag == startingEtag);
var actualDocsToTake = docs.Skip(start).Take(end - start);
if (batchToAdd != null)
lock (batchToAdd)
batchToAdd.EndEtag = Etag.Increment(docs[end - 1].Etag.Value, 1);
var inMemoryIndexBatch = new InMemoryIndexBatch
Age = Thread.VolatileRead(ref currentIndexingAge),
Documents = new List<JsonDocument>(actualDocsToTake),
StartingEtag = startingEtag
inMemoryIndexBatch.EndEtag = Etag.Increment(inMemoryIndexBatch.Documents.Last().Etag.Value, 1);
inMemoryIndexBatches.TryAdd(startingEtag, inMemoryIndexBatch);
private IEnumerable<int> SplitNonConsecutiveDocs(JsonDocument[] docs)
var etag = docs[0].Etag.Value;
var doc = docs[0];
for (int i = 1; i < docs.Length; i++)
if (Etag.GetDiffrence(doc.Etag.Value, etag) != 1)
yield return i;
yield return docs.Length;
private static Guid GetLowestEtag(JsonDocument[] past, int start, int end)
var lowest = new ComparableByteArray(past[start].Etag.Value);
for (int i = start + 1; i < end; i++)
Guid etag = past[i].Etag.Value;
if (lowest.CompareTo(etag) < 0)
lowest = new ComparableByteArray(etag);
return lowest.ToGuid();
public void CleanupDocumentsToRemove(Guid lastIndexedEtag)
var highest = new ComparableByteArray(lastIndexedEtag);
foreach (var docToRemove in documentsToRemove)
if (docToRemove.Value.All(etag => highest.CompareTo(etag) > 0) == false)
HashSet<Guid> _;
documentsToRemove.TryRemove(docToRemove.Key, out _);
public bool FilterDocuments(JsonDocument document)
HashSet<Guid> etags;
return documentsToRemove.TryGetValue(document.Key, out etags) && etags.Contains(document.Etag.Value);
public void AfterDelete(string key, Guid lastDocumentEtag)
documentsToRemove.AddOrUpdate(key, s => new HashSet<Guid> { lastDocumentEtag },
(s, set) => new HashSet<Guid>(set) { lastDocumentEtag });
public bool ShouldSkipDeleteFromIndex(JsonDocument item)
if (item.SkipDeleteFromIndex == false)
return false;
return documentsToRemove.ContainsKey(item.Key) == false;
#region Nested type: FutureIndexBatch
private class FutureIndexBatch
public int Age;
public Guid StartingEtag;
public Task<List<JsonDocument>> Task;
#region Nested type: InMemoryIndexBatch
private class InMemoryIndexBatch
public int Age;
public List<JsonDocument> Documents;
public Guid StartingEtag;
public Guid EndEtag;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment