Created
April 13, 2024 23:07
-
-
Save ReubenBond/24c008ba5596ff306d688b5992fc7ec3 to your computer and use it in GitHub Desktop.
Filtered Space-Saving
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Runtime.CompilerServices; | |
using System.Runtime.InteropServices; | |
namespace Orleans.Runtime.Placement.Rebalancing; | |
// Implementation of "Filtered Space-Saving" from "Finding top-k elements in data streams" | |
// by Nuno Homem & Joao Paulo Carvalho (https://www.hlt.inesc-id.pt/~fmmb/references/misnis.ref0a.pdf). | |
internal abstract class FrequencyFilter<TKey>(int capacity) where TKey : notnull | |
{ | |
private int _capacity = capacity; | |
// A lookup from keys to positions in the counter map. | |
private readonly Dictionary<ulong, int> _counterSlotMap = []; | |
// The list of counters, ordered by count. | |
private readonly List<Counter> _counterList = []; | |
// An error estimate for a given hash value. | |
private readonly List<uint> _alphaMap = new(GetAlphaCapacity(capacity)); | |
public int Size => _counterList.Count; | |
public int Capacity { get => _capacity; set => Resize(value); } | |
// Gets the top K keys and their count estimates in descending order. | |
public IEnumerable<(TKey Key, uint CountEstimate)> Counters | |
{ | |
get | |
{ | |
foreach (var counter in _counterList) | |
{ | |
yield return (counter.Key, counter.Count); | |
} | |
} | |
} | |
public abstract ulong GetHashCode64(in TKey key); | |
public void Clear() | |
{ | |
_counterSlotMap.Clear(); | |
_counterList.Clear(); | |
_alphaMap.Clear(); | |
} | |
private void Resize(int newCapacity) | |
{ | |
_capacity = newCapacity; | |
_counterList.Capacity = newCapacity; | |
_alphaMap.Capacity = GetAlphaCapacity(newCapacity); | |
_alphaMap.Clear(); | |
} | |
public void AddOrUpdate(in TKey key) | |
{ | |
const int Increment = 1; | |
var longHash = GetHashCode64(key); | |
// Increase count of a key that is already being tracked. | |
// There is a minute chance of a hash collision, which is deemed acceptable. | |
if (_counterSlotMap.TryGetValue(longHash, out var slot)) | |
{ | |
ref var existing = ref CollectionsMarshal.AsSpan(_counterList)[slot]; | |
existing.Count += Increment; | |
Percolate(ref existing, slot); | |
return; | |
} | |
// Key is not being tracked, but can fit in the top K, so add it. | |
if (Size < Capacity) | |
{ | |
AddCounter(new Counter(key, Increment, error: 0), longHash); | |
return; | |
} | |
var min = _counterList[^1]; | |
// Filter out values which are estimated to have appeared less frequently than the minimum. | |
var alphaMask = _alphaMap.Count - 1; | |
var hash = longHash.GetHashCode(); | |
var alpha = _alphaMap[hash & alphaMask]; | |
if (alpha + Increment < min.Count) | |
{ | |
// Increase the count estimate. | |
_alphaMap[hash & alphaMask] += Increment; | |
return; | |
} | |
// Remove the least frequent element. | |
var longMinHash = GetHashCode64(min.Key); | |
_counterSlotMap.Remove(longMinHash); | |
CollectionsMarshal.SetCount(_counterList, _counterList.Count - 1); | |
// While evicting minimum element, update its location in the filter sketch with its count to improve | |
// the chance of it passing the filter in the future. | |
var minHash = longMinHash.GetHashCode(); | |
_alphaMap[minHash & alphaMask] = min.Count; | |
// Push the new element in place of the last. | |
AddCounter(new Counter(key, alpha + Increment, error: alpha), longHash); | |
} | |
private void AddCounter(Counter counter, ulong longHash) | |
{ | |
var slot = _counterList.Count; | |
_counterList.Add(counter); | |
_counterSlotMap[longHash] = slot; | |
Percolate(ref counter, slot); | |
} | |
// Bubble the updated counter up until it is at its correct position. | |
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |
private void Percolate(ref Counter counter, int slot) | |
{ | |
var counterList = CollectionsMarshal.AsSpan(_counterList); | |
while (slot > 0) | |
{ | |
var nextSlot = slot - 1; | |
var next = counterList[nextSlot]; | |
if (!counter.IsEstimateGreater(next)) | |
{ | |
break; | |
} | |
// Swap the elements in the list. | |
counterList[slot] = next; | |
counterList[nextSlot] = counter; | |
// Update the map. | |
_counterSlotMap[GetHashCode64(next.Key)] = slot; | |
_counterSlotMap[GetHashCode64(counter.Key)] = nextSlot; | |
slot = nextSlot; | |
} | |
} | |
private static int GetAlphaCapacity(int capacity) | |
{ | |
// Suggested constants in the paper "Finding top-k elements in data streams", chap 6. equation (24) | |
// Round to nearest power of 2 for cheaper binning without modulo | |
const int AlphaMapElementsPerCounter = 6; | |
return 1 << (64 - int.LeadingZeroCount(capacity * AlphaMapElementsPerCounter)); | |
} | |
private struct Counter(TKey key, uint count, uint error) | |
{ | |
public readonly TKey Key = key; | |
public uint Count = count; | |
public uint Error = error; | |
// Returns true if this counter is estimated to be greater than the other. | |
public readonly bool IsEstimateGreater(Counter other) => Count > other.Count || Count == other.Count && Error < other.Error; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment