Created
March 13, 2017 20:55
-
-
Save jwilder/048a081ba31b9b7f68c2b26287733c4b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package merge | |
import ( | |
"sort" | |
"testing" | |
"time" | |
"github.com/influxdata/influxdb/tsdb/engine/tsm1" | |
) | |
type Values []tsm1.Value | |
// Merge overlays b to top of a. If two values conflict with | |
// the same timestamp, b is used. Both a and b must be sorted | |
// in ascending order. | |
func (a Values) StdMerge(b tsm1.Values) tsm1.Values { | |
if len(a) == 0 { | |
return b | |
} | |
if len(b) == 0 { | |
return tsm1.Values(a) | |
} | |
out := make(Values, 0, len(a)+len(b)) | |
for len(a) > 0 && len(b) > 0 { | |
if a[0].UnixNano() < b[0].UnixNano() { | |
out, a = append(out, a[0]), a[1:] | |
} else { | |
if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() { | |
a = a[1:] | |
} else { | |
out, b = append(out, b[0]), b[1:] | |
} | |
} | |
} | |
if len(a) > 0 { | |
out = append(out, a...) | |
} else { | |
out = append(out, b...) | |
} | |
return tsm1.Values(out) | |
} | |
// Merge overlays b to top of a. If two values conflict with | |
// the same timestamp, b is used. Both a and b must be sorted | |
// in ascending order. | |
func (a Values) Merge(b Values) Values { | |
if len(a) == 0 { | |
return b | |
} | |
if len(b) == 0 { | |
return a | |
} | |
if a[len(a)-1].UnixNano() < b[0].UnixNano() { | |
return append(a, b...) | |
} | |
if b[len(b)-1].UnixNano() < a[0].UnixNano() { | |
return append(b, a...) | |
} | |
for i := 0; i < len(a) && len(b) > 0; i++ { | |
av, bv := a[i].UnixNano(), b[0].UnixNano() | |
// Value in a is greater than B, we need to merge | |
if av > bv { | |
// Save value in a | |
temp := a[i] | |
// Overwrite a with b | |
a[i] = b[0] | |
// Slide all values of b down 1 | |
copy(b, b[1:]) | |
b = b[:len(b)-1] | |
var k int | |
if len(b) > 0 && av > b[len(b)-1].UnixNano() { | |
// Fast path where a is after b, we skip the search | |
k = len(b) | |
} else { | |
// See where value we save from a should be inserted in b to keep b sorted | |
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() }) | |
} | |
if k == len(b) { | |
// Last position? | |
b = append(b, temp) | |
} else if b[k].UnixNano() != temp.UnixNano() { | |
// Save the last element, since it will get overwritten | |
last := b[len(b)-1] | |
// Somewhere in the middle of b, insert it only if it's not a duplicate | |
copy(b[k+1:], b[k:]) | |
// Add the last vale to the end | |
b = append(b, last) | |
b[k] = temp | |
} | |
} else if av == bv { | |
// Value in a an b are the same, use b | |
a[i] = b[0] | |
b = b[1:] | |
} | |
} | |
if len(b) > 0 { | |
return append(a, b...) | |
} | |
return a | |
} | |
func BenchmarkValues_Merge(b *testing.B) { | |
valueCount := 1000 | |
times := getTimes(valueCount, 60, time.Second) | |
a := make([]tsm1.Value, len(times)) | |
c := make([]tsm1.Value, len(times)) | |
for i, t := range times { | |
a[i] = tsm1.NewValue(t, float64(i)) | |
c[i] = tsm1.NewValue(t+1, float64(i)) | |
} | |
b.ResetTimer() | |
benchmarkMerge(a, c, b) | |
} | |
func BenchmarkValues_MergeSame(b *testing.B) { | |
valueCount := 1000 | |
times := getTimes(valueCount, 60, time.Second) | |
a := make([]tsm1.Value, len(times)) | |
c := make([]tsm1.Value, len(times)) | |
for i, t := range times { | |
a[i] = tsm1.NewValue(t, float64(i)) | |
c[i] = tsm1.NewValue(t, float64(i)) | |
} | |
b.ResetTimer() | |
benchmarkMerge(a, c, b) | |
} | |
func BenchmarkValues_StdMerge(b *testing.B) { | |
valueCount := 1000 | |
times := getTimes(valueCount, 60, time.Second) | |
a := make([]tsm1.Value, len(times)) | |
c := make([]tsm1.Value, len(times)) | |
for i, t := range times { | |
a[i] = tsm1.NewValue(t, float64(i)) | |
c[i] = tsm1.NewValue(t+1, float64(i)) | |
} | |
b.ResetTimer() | |
benchmarkStdMerge(a, c, b) | |
} | |
func BenchmarkValues_StdMergeSame(b *testing.B) { | |
valueCount := 1000 | |
times := getTimes(valueCount, 60, time.Second) | |
a := make([]tsm1.Value, len(times)) | |
c := make([]tsm1.Value, len(times)) | |
for i, t := range times { | |
a[i] = tsm1.NewValue(t, float64(i)) | |
c[i] = tsm1.NewValue(t, float64(i)) | |
} | |
b.ResetTimer() | |
benchmarkStdMerge(a, c, b) | |
} | |
func benchmarkMerge(a, c tsm1.Values, b *testing.B) { | |
for i := 0; i < b.N; i++ { | |
Values(a).Merge(Values(c)) | |
} | |
} | |
func benchmarkStdMerge(a, c tsm1.Values, b *testing.B) { | |
for i := 0; i < b.N; i++ { | |
Values(a).StdMerge(c) | |
} | |
} | |
func getTimes(n, step int, precision time.Duration) []int64 { | |
t := time.Now().Round(precision).UnixNano() | |
a := make([]int64, n) | |
for i := 0; i < n; i++ { | |
a[i] = t + (time.Duration(i*60) * precision).Nanoseconds() | |
} | |
return a | |
} |
Author
jwilder
commented
Mar 13, 2017
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment