Created
December 27, 2012 15:33
-
-
Save fitzchak/4389082 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ----------------------------------------------------------------------- | |
// <copyright file="PrefetchingBehavior.cs" company="Hibernating Rhinos LTD"> | |
// Copyright (c) Hibernating Rhinos LTD. All rights reserved. | |
// </copyright> | |
// ----------------------------------------------------------------------- | |
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Raven.Abstractions; | |
using Raven.Abstractions.Data; | |
using Raven.Abstractions.Logging; | |
using Raven.Abstractions.Util; | |
using Raven.Database.Config; | |
using Raven.Database.Impl; | |
namespace Raven.Database.Indexing | |
{ | |
public class PrefetchingBehavior : IDisposable | |
{ | |
private static readonly ILog log = LogManager.GetCurrentClassLogger(); | |
private readonly BaseBatchSizeAutoTuner autoTuner; | |
private readonly WorkContext context; | |
private readonly ConcurrentDictionary<string, HashSet<Guid>> documentsToRemove = | |
new ConcurrentDictionary<string, HashSet<Guid>>(StringComparer.InvariantCultureIgnoreCase); | |
private readonly ConcurrentDictionary<Guid, FutureIndexBatch> futureIndexBatches = | |
new ConcurrentDictionary<Guid, FutureIndexBatch>(); | |
private readonly ConcurrentDictionary<Guid, InMemoryIndexBatch> inMemoryIndexBatches = | |
new ConcurrentDictionary<Guid, InMemoryIndexBatch>(); | |
private int currentIndexingAge; | |
public PrefetchingBehavior(WorkContext context, BaseBatchSizeAutoTuner autoTuner) | |
{ | |
this.context = context; | |
this.autoTuner = autoTuner; | |
} | |
#region IDisposable Members | |
public void Dispose() | |
{ | |
Task.WaitAll(futureIndexBatches.Values.Select(ObserveDiscardedTask).ToArray()); | |
futureIndexBatches.Clear(); | |
} | |
#endregion | |
public List<JsonDocument> GetDocumentsBatchFrom(Guid etag) | |
{ | |
var results = | |
GetInMemoryJsonDocuments(etag) ?? | |
GetFutureJsonDocuments(etag) ?? | |
GetJsonDocsFromDisk(etag); | |
var jsonDocuments = MergeWithOtherMemoryOrFutureResults(results); | |
return jsonDocuments; | |
} | |
private List<JsonDocument> GetInMemoryJsonDocuments(Guid etag) | |
{ | |
return null; | |
if (context.Configuration.DisableDocumentPreFetchingForIndexing) | |
return null; | |
var nextDocEtag = GetNextDocEtag(etag); | |
InMemoryIndexBatch nextBatch; | |
if (inMemoryIndexBatches.TryRemove(nextDocEtag, out nextBatch) == false) | |
return null; | |
lock (nextBatch) | |
{ | |
// make sure that if there are any concurrent updates, it will wait for them | |
} | |
return nextBatch.Documents; | |
} | |
private List<JsonDocument> GetFutureJsonDocuments(Guid etag) | |
{ | |
return null; | |
if (context.Configuration.DisableDocumentPreFetchingForIndexing) | |
return null; | |
try | |
{ | |
Guid nextDocEtag = GetNextDocEtag(etag); | |
FutureIndexBatch nextBatch; | |
if (futureIndexBatches.TryRemove(nextDocEtag, out nextBatch) == false) | |
return null; | |
if (Task.CurrentId == nextBatch.Task.Id) | |
return null; | |
return nextBatch.Task.Result; | |
} | |
catch (Exception e) | |
{ | |
log.WarnException("Error when getting next batch value asynchronously, will try in sync manner", e); | |
return null; | |
} | |
} | |
private List<JsonDocument> GetJsonDocsFromDisk(Guid etag) | |
{ | |
Guid? untilEtag = GetNextEtagInMemory(etag); | |
List<JsonDocument> jsonDocs = null; | |
context.TransactionaStorage.Batch(actions => | |
{ | |
jsonDocs = actions.Documents | |
.GetDocumentsAfter( | |
etag, | |
autoTuner.NumberOfItemsToIndexInSingleBatch, | |
autoTuner.MaximumSizeAllowedToFetchFromStorage, | |
untilEtag) | |
.Where(x => x != null) | |
.Select(doc => | |
{ | |
DocumentRetriever.EnsureIdInMetadata(doc); | |
return doc; | |
}) | |
.ToList(); | |
}); | |
if(untilEtag == null) | |
{ | |
MaybeAddFutureBatch(jsonDocs); | |
} | |
return jsonDocs; | |
} | |
private Guid? GetNextEtagInMemory(Guid etag) | |
{ | |
var current = new ComparableByteArray(etag); | |
var guid = inMemoryIndexBatches.Keys.Concat(futureIndexBatches.Keys) | |
.Where(x => current.CompareTo(x) < 0) | |
.OrderBy(x => x, ByteArrayComparer.Instance) | |
.FirstOrDefault(); | |
if (guid == Guid.Empty) | |
return null; | |
return guid; | |
} | |
private List<JsonDocument> MergeWithOtherMemoryOrFutureResults(List<JsonDocument> results) | |
{ | |
if (results == null || results.Count == 0) | |
return results; | |
var nextDocEtag = GetNextDocEtag(GetNextHighestEtag(results)); | |
while (true) | |
{ | |
InMemoryIndexBatch value; | |
if(inMemoryIndexBatches.TryRemove(nextDocEtag, out value)) | |
{ | |
lock(value) | |
{ | |
// make sure that there an no current modifications | |
} | |
results.AddRange(value.Documents); | |
nextDocEtag = GetNextDocEtag(value.Documents.Last().Etag.Value); | |
continue; | |
} | |
FutureIndexBatch nextBatch; | |
if (futureIndexBatches.TryRemove(nextDocEtag, out nextBatch)) | |
{ | |
lock (nextBatch) | |
{ | |
// make sure that there an no current modifications | |
} | |
if (nextBatch.Task.IsCompleted == false) | |
{ | |
nextBatch.Task.Wait(); | |
} | |
lock (nextBatch) | |
{ | |
// make sure that there an no current modifications | |
} | |
results.AddRange(nextBatch.Task.Result); | |
nextDocEtag = GetNextDocEtag(nextBatch.Task.Result.Last().Etag.Value); | |
continue; | |
} | |
break; | |
} | |
// a single doc may appear multiple times, if it was updated | |
// while we were fetching things, so we have several versions | |
// of the same doc loaded, this will make sure that we will only | |
// take one of them. | |
return results | |
.GroupBy(x => x.Key) | |
.Select(g => g.OrderBy(x => x.Etag, ByteArrayComparer.Instance).Last()) | |
.ToList(); | |
} | |
private void MaybeAddFutureBatch(List<JsonDocument> past) | |
{ | |
if (context.Configuration.DisableDocumentPreFetchingForIndexing || context.RunIndexing == false) | |
return; | |
if (context.Configuration.MaxNumberOfParallelIndexTasks == 1) | |
return; | |
if (past.Count == 0) | |
return; | |
if (futureIndexBatches.Count > 5) // we limit the number of future calls we do | |
{ | |
int alreadyLoaded = futureIndexBatches.Values.Sum(x => | |
{ | |
if (x.Task.IsCompleted) | |
return x.Task.Result.Count; | |
return 0; | |
}); | |
if (alreadyLoaded > autoTuner.NumberOfItemsToIndexInSingleBatch) | |
return; | |
} | |
// ensure we don't do TOO much future caching | |
if (MemoryStatistics.AvailableMemory < | |
context.Configuration.AvailableMemoryForRaisingIndexBatchSizeLimit) | |
return; | |
// we loaded the maximum amount, there are probably more items to read now. | |
Guid highestLoadedEtag = GetNextHighestEtag(past); | |
Guid nextEtag = GetNextDocEtag(highestLoadedEtag); | |
if (nextEtag == highestLoadedEtag) | |
return; // there is nothing newer to do | |
if (futureIndexBatches.ContainsKey(nextEtag)) // already loading this | |
return; | |
var futureBatchStat = new FutureBatchStats | |
{ | |
Timestamp = SystemTime.UtcNow, | |
}; | |
Stopwatch sp = Stopwatch.StartNew(); | |
context.AddFutureBatch(futureBatchStat); | |
futureIndexBatches.TryAdd(nextEtag, new FutureIndexBatch | |
{ | |
StartingEtag = nextEtag, | |
Age = Interlocked.Increment(ref currentIndexingAge), | |
Task = Task.Factory.StartNew(() => | |
{ | |
List<JsonDocument> jsonDocuments = null; | |
int localWork = 0; | |
while (context.RunIndexing) | |
{ | |
jsonDocuments = GetJsonDocsFromDisk(nextEtag); | |
if (jsonDocuments.Count > 0) | |
break; | |
futureBatchStat.Retries++; | |
context.WaitForWork(TimeSpan.FromMinutes(10), ref localWork, "PreFetching"); | |
} | |
futureBatchStat.Duration = sp.Elapsed; | |
futureBatchStat.Size = jsonDocuments == null ? 0 : jsonDocuments.Count; | |
if (jsonDocuments != null) | |
{ | |
MaybeAddFutureBatch(jsonDocuments); | |
} | |
return jsonDocuments; | |
}) | |
}); | |
} | |
private Guid GetNextDocEtag(Guid highestEtag) | |
{ | |
var nextDocEtag = highestEtag; | |
var oneUpEtag = Etag.Increment(nextDocEtag, 1); | |
// no need to go to disk to find the next etag if we already have it in memory | |
if (inMemoryIndexBatches.ContainsKey(oneUpEtag) || | |
futureIndexBatches.ContainsKey(oneUpEtag)) | |
return oneUpEtag; | |
context.TransactionaStorage.Batch( | |
accessor => { nextDocEtag = accessor.Documents.GetBestNextDocumentEtag(highestEtag); }); | |
return nextDocEtag; | |
} | |
private static Guid GetNextHighestEtag(List<JsonDocument> past) | |
{ | |
JsonDocument jsonDocument = GetHighestEtag(past); | |
if (jsonDocument == null) | |
return Guid.Empty; | |
return jsonDocument.Etag ?? Guid.Empty; | |
} | |
public static JsonDocument GetHighestEtag(List<JsonDocument> past) | |
{ | |
var highest = new ComparableByteArray(Guid.Empty); | |
JsonDocument highestDoc = null; | |
for (int i = past.Count - 1; i >= 0; i--) | |
{ | |
Guid etag = past[i].Etag.Value; | |
if (highest.CompareTo(etag) > 0) | |
{ | |
continue; | |
} | |
highest = new ComparableByteArray(etag); | |
highestDoc = past[i]; | |
} | |
return highestDoc; | |
} | |
private static Task ObserveDiscardedTask(FutureIndexBatch source) | |
{ | |
return source.Task.ContinueWith(task => | |
{ | |
if (task.Exception != null) | |
{ | |
log.WarnException("Error happened on discarded future work batch", task.Exception); | |
} | |
else | |
{ | |
log.Warn("WASTE: Discarding future work item without using it, to reduce memory usage"); | |
} | |
}); | |
} | |
public void BatchProcessingComplete() | |
{ | |
int indexingAge = Interlocked.Increment(ref currentIndexingAge); | |
// make sure that we don't have too much "future cache" items | |
const int numberOfIndexingGenerationsAllowed = 64; | |
foreach (FutureIndexBatch source in futureIndexBatches.Values.Where(x => (indexingAge - x.Age) > numberOfIndexingGenerationsAllowed).ToList()) | |
{ | |
ObserveDiscardedTask(source); | |
FutureIndexBatch _; | |
futureIndexBatches.TryRemove(source.StartingEtag, out _); | |
} | |
// make sure that we don't have too much "in memory cache" items | |
foreach (InMemoryIndexBatch source in inMemoryIndexBatches.Values.Where(x => (indexingAge - x.Age) > numberOfIndexingGenerationsAllowed).ToList()) | |
{ | |
InMemoryIndexBatch _; | |
inMemoryIndexBatches.TryRemove(source.StartingEtag, out _); | |
} | |
} | |
public void GetFutureStats(int currentBatchSize, out int futureLen, out int futureSize) | |
{ | |
futureLen = futureIndexBatches.Values.Sum(x => | |
{ | |
if (x.Task.IsCompleted) | |
{ | |
return x.Task.Result.Count; | |
} | |
return currentBatchSize / 15; | |
}); | |
futureSize = futureIndexBatches.Values.Sum(x => | |
{ | |
if (x.Task.IsCompleted) | |
{ | |
List<JsonDocument> jsonResults = x.Task.Result; | |
return jsonResults.Sum(s => s.SerializedSizeOnDisk); | |
} | |
return currentBatchSize * 256; | |
}); | |
} | |
public void AfterCommit(JsonDocument[] docs) | |
{ | |
return; | |
if (context.Configuration.DisableDocumentPreFetchingForIndexing || docs.Length == 0) | |
return; | |
int countOfLoadedItems = inMemoryIndexBatches.Values.Sum(x => x.Documents.Count); | |
if (countOfLoadedItems > // don't use too much | |
context.Configuration.MaxNumberOfItemsToIndexInSingleBatch) | |
return; | |
foreach (JsonDocument doc in docs) | |
{ | |
DocumentRetriever.EnsureIdInMetadata(doc); | |
} | |
if (docs.Length == 1) | |
{ | |
AddInMemoryBatch(docs, 0, 1); | |
return; | |
} | |
int start = 0; | |
foreach (var end in SplitNonConsecutiveDocs(docs)) | |
{ | |
AddInMemoryBatch(docs, start, end); | |
start = end; | |
} | |
} | |
private void AddInMemoryBatch(JsonDocument[] docs, int start, int end) | |
{ | |
var startingEtag = GetLowestEtag(docs, start, end); | |
var batchToAdd = inMemoryIndexBatches.Values.FirstOrDefault(x => x.EndEtag == startingEtag); | |
var actualDocsToTake = docs.Skip(start).Take(end - start); | |
if (batchToAdd != null) | |
{ | |
lock (batchToAdd) | |
{ | |
batchToAdd.Documents.AddRange(actualDocsToTake); | |
batchToAdd.EndEtag = Etag.Increment(docs[end - 1].Etag.Value, 1); | |
} | |
return; | |
} | |
var inMemoryIndexBatch = new InMemoryIndexBatch | |
{ | |
Age = Thread.VolatileRead(ref currentIndexingAge), | |
Documents = new List<JsonDocument>(actualDocsToTake), | |
StartingEtag = startingEtag | |
}; | |
inMemoryIndexBatch.EndEtag = Etag.Increment(inMemoryIndexBatch.Documents.Last().Etag.Value, 1); | |
inMemoryIndexBatches.TryAdd(startingEtag, inMemoryIndexBatch); | |
} | |
private IEnumerable<int> SplitNonConsecutiveDocs(JsonDocument[] docs) | |
{ | |
var etag = docs[0].Etag.Value; | |
var doc = docs[0]; | |
for (int i = 1; i < docs.Length; i++) | |
{ | |
if (Etag.GetDiffrence(doc.Etag.Value, etag) != 1) | |
{ | |
yield return i; | |
} | |
} | |
yield return docs.Length; | |
} | |
private static Guid GetLowestEtag(JsonDocument[] past, int start, int end) | |
{ | |
var lowest = new ComparableByteArray(past[start].Etag.Value); | |
for (int i = start + 1; i < end; i++) | |
{ | |
Guid etag = past[i].Etag.Value; | |
if (lowest.CompareTo(etag) < 0) | |
{ | |
continue; | |
} | |
lowest = new ComparableByteArray(etag); | |
} | |
return lowest.ToGuid(); | |
} | |
public void CleanupDocumentsToRemove(Guid lastIndexedEtag) | |
{ | |
var highest = new ComparableByteArray(lastIndexedEtag); | |
foreach (var docToRemove in documentsToRemove) | |
{ | |
if (docToRemove.Value.All(etag => highest.CompareTo(etag) > 0) == false) | |
continue; | |
HashSet<Guid> _; | |
documentsToRemove.TryRemove(docToRemove.Key, out _); | |
} | |
} | |
public bool FilterDocuments(JsonDocument document) | |
{ | |
HashSet<Guid> etags; | |
return documentsToRemove.TryGetValue(document.Key, out etags) && etags.Contains(document.Etag.Value); | |
} | |
public void AfterDelete(string key, Guid lastDocumentEtag) | |
{ | |
documentsToRemove.AddOrUpdate(key, s => new HashSet<Guid> { lastDocumentEtag }, | |
(s, set) => new HashSet<Guid>(set) { lastDocumentEtag }); | |
} | |
public bool ShouldSkipDeleteFromIndex(JsonDocument item) | |
{ | |
if (item.SkipDeleteFromIndex == false) | |
return false; | |
return documentsToRemove.ContainsKey(item.Key) == false; | |
} | |
#region Nested type: FutureIndexBatch | |
private class FutureIndexBatch | |
{ | |
public int Age; | |
public Guid StartingEtag; | |
public Task<List<JsonDocument>> Task; | |
} | |
#endregion | |
#region Nested type: InMemoryIndexBatch | |
private class InMemoryIndexBatch | |
{ | |
public int Age; | |
public List<JsonDocument> Documents; | |
public Guid StartingEtag; | |
public Guid EndEtag; | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment