Last active
April 19, 2017 11:34
-
-
Save abdullin/bae1150fb8a09d15d14cdc208e4558e4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.IO.Compression; | |
using System.Linq; | |
using System.Threading; | |
namespace SearchBench { | |
public struct SkuEntry { | |
public readonly long TenantId; | |
public readonly long ProductId; | |
public readonly string Sku; | |
public SkuEntry(long tenantId, long productId, string sku) { | |
TenantId = tenantId; | |
ProductId = productId; | |
Sku = sku; | |
} | |
} | |
public interface ISkuSearch { | |
void AddSkus(IList<SkuEntry> listOfProductIdsAndSkus); | |
void DeleteSkus(IList<SkuEntry> listOfSkusToRemove); | |
IList<long> FindProducts(long tenantId, string sku, int limit); | |
} | |
class Program { | |
static void Main(string[] args) { | |
var file = args[0]; | |
ISkuSearch search = new RinatsSearch(); | |
Console.WriteLine("Basic unit test"); | |
BasicUnitTest(search); | |
const int batchSize = 1000; | |
using (var loader = new Loader(file)) { | |
Console.WriteLine("Warm-up to avoid JIT overhead"); | |
for (var i = 0; i < 10; i++) { | |
search.AddSkus(loader.GetNext(batchSize)); | |
} | |
Console.WriteLine("Replay scenario"); | |
var total = 0L; | |
var randomTenants = new HashSet<long>(); | |
var randomSamples = new List<SkuEntry>(); | |
for (var i = 10; i < 40; i++) { | |
var increase = (long)Math.Pow(2, i); | |
Console.Write("Adding {0}... ", increase); | |
var watch = Stopwatch.StartNew(); | |
long j; | |
for (j = 0; j < increase; j += batchSize) { | |
var list = loader.GetNext(batchSize); | |
search.AddSkus(list); | |
// sampling | |
for (var k = 0; k < 10; k++) { | |
randomTenants.Add(list[k].TenantId); | |
randomSamples.Add(list[k]); | |
} | |
} | |
Console.WriteLine("Speed: {0:0.00}r/s", increase / watch.Elapsed.TotalSeconds); | |
total += j; | |
Console.Write("Seq search (fail) "); | |
watch.Restart(); | |
var ts = randomTenants.OrderBy(t => t).ToList(); | |
var searchSize = 1000; | |
for (var k = 0; k < searchSize; k++) { | |
var found = | |
search.FindProducts(ts[k % ts.Count], Guid.NewGuid().ToString(), int.MaxValue).Count; | |
if (found > 0) { | |
throw new ApplicationException("Should never find this"); | |
} | |
} | |
Console.WriteLine("Speed {0:0.00}r/s", 1D * searchSize / watch.Elapsed.TotalSeconds); | |
Console.Write("Seq search (pass) "); | |
watch.Restart(); | |
for (var k = 0; k < searchSize; k++) { | |
var s = randomSamples[k % randomSamples.Count]; | |
var sku = s.Sku; | |
if (sku.Length > 6) { | |
sku = sku.Substring(3); | |
} | |
var found = search.FindProducts(s.TenantId, sku, int.MaxValue).Count; | |
if (found == 0) { | |
throw new ApplicationException("Should find this"); | |
} | |
} | |
Console.WriteLine("Speed {0:0.00}r/s", 1D * searchSize / watch.Elapsed.TotalSeconds); | |
Console.Write("Parallel {0} read + write...", Environment.ProcessorCount); | |
long parallel = 0; | |
using (var cts = new CancellationTokenSource()) { | |
for (var k = 0; k < Environment.ProcessorCount; k++) { | |
new Thread(() => { | |
while (!cts.IsCancellationRequested) { | |
var pos = (long)((uint)Environment.TickCount); | |
var sku = randomSamples[(int)(pos % randomSamples.Count)]; | |
search.FindProducts(sku.TenantId, sku.Sku, int.MaxValue); | |
Interlocked.Increment(ref parallel); | |
} | |
}).Start(); | |
} | |
new Thread(() => { | |
while (!cts.IsCancellationRequested) { | |
search.AddSkus(loader.GetNext(1)); | |
} | |
}).Start(); | |
Thread.Sleep(500); | |
Interlocked.Exchange(ref parallel, 0); | |
Thread.Sleep(1000); | |
var count = Interlocked.Read(ref parallel); | |
cts.Cancel(); | |
Console.WriteLine("{0}r/s", count); | |
Thread.Sleep(500); | |
} | |
} | |
} | |
Console.ReadLine(); | |
// starting | |
} | |
sealed class Loader : IDisposable { | |
readonly StreamReader _reader; | |
static readonly char[] Separator = {'\t'}; | |
public Loader(string file) { | |
if (file.Contains("gz")) { | |
_reader = new StreamReader(new GZipStream(File.OpenRead(file), CompressionMode.Decompress)); | |
} else { | |
_reader = new StreamReader(File.OpenRead(file)); | |
} | |
_reader.ReadLine(); | |
} | |
readonly List<SkuEntry> _buffer = new List<SkuEntry>(); | |
public IList<SkuEntry> GetNext(int count) { | |
_buffer.Clear(); | |
while (_buffer.Count < count) { | |
var s = _reader.ReadLine(); | |
if (s == null) { | |
throw new ApplicationException("End of line"); | |
} | |
var tabs = s.Split(Separator, 4); | |
if (tabs.Length < 4) { | |
continue; | |
} | |
var tenant = long.Parse(tabs[0]); | |
var key = byte.Parse(tabs[1]); | |
var productId = long.Parse(tabs[2]); | |
if (key != 1 && key != 5) { | |
continue; | |
} | |
_buffer.Add(new SkuEntry(tenant, productId, tabs[3])); | |
} | |
return _buffer; | |
} | |
public void Dispose() { | |
_reader.Dispose(); | |
} | |
} | |
static void BasicUnitTest(ISkuSearch search) { | |
// using tenant 0 for testing | |
const long t = 0; | |
ShouldFind(search, t, "test", 0); | |
search.AddSkus(new List<SkuEntry> { | |
new SkuEntry(t, 1, "test1"), | |
new SkuEntry(t, 2, "test2") | |
}); | |
ShouldFind(search, t, "test", 2); | |
ShouldFind(search, t, "est", 2); | |
ShouldFind(search, t, "test1", 1); | |
ShouldFind(search, t, "test2", 1); | |
ShouldFind(search, t, "est2", 1); | |
ShouldFind(search, t, "est3", 0); | |
search.DeleteSkus(new List<SkuEntry> { | |
new SkuEntry(t, 2, "test2") | |
}); | |
ShouldFind(search, t, "test2", 0); | |
ShouldFind(search, t, "test1", 1); | |
} | |
static void ShouldFind(ISkuSearch search, long tenantId, string query, int shouldFind) { | |
var count = search.FindProducts(tenantId, query, shouldFind + 1).Count; | |
if (count == shouldFind) { | |
return; | |
} | |
var msg = string.Format("Expected to find {0} of {1}, got {2}", shouldFind, query, count); | |
throw new InvalidOperationException(msg); | |
} | |
} | |
sealed class RinatsSearch : ISkuSearch { | |
readonly ConcurrentDictionary<long, List<SkuEntry>> _dict = | |
new ConcurrentDictionary<long, List<SkuEntry>>(); | |
public void AddSkus(IList<SkuEntry> listOfProductIdsAndSkus) { | |
foreach (var sku in listOfProductIdsAndSkus) { | |
_dict.AddOrUpdate(sku.TenantId, l => new List<SkuEntry> { sku }, | |
(l, list) => new List<SkuEntry>(list) {sku}); | |
} | |
} | |
public void DeleteSkus(IList<SkuEntry> listOfSkusToRemove) { | |
foreach (var sku in listOfSkusToRemove) { | |
_dict.AddOrUpdate(sku.TenantId, l => new List<SkuEntry>(), (l, list) => { | |
var clone = new List<SkuEntry>(list); | |
clone.RemoveAll(s => s.ProductId == sku.ProductId); | |
return clone; | |
}); | |
} | |
} | |
public IList<long> FindProducts(long tenantId, string sku, int limit) { | |
List<SkuEntry> value; | |
if (!_dict.TryGetValue(tenantId, out value)) { | |
return new long[0]; | |
} | |
return value | |
.Where(s => s.Sku.IndexOf(sku, StringComparison.OrdinalIgnoreCase) >= 0).Take(limit) | |
.Select(s => s.ProductId) | |
.ToList(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment