Last active
September 16, 2016 14:04
-
-
Save billhathaway/303c72387007191823f8f3a57bd03b07 to your computer and use it in GitHub Desktop.
elvis added stream example with buffer and fixed bug in case of find pattern split across buffers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Thanks to Tyler Burnell, Bill Kennedy and others who had written all the boilerplate I copied in main.go and main_test.go | |
Note: the benchmark data is horribly unrealistic (10M input of either no matches or all matches), but I think the model of testing MB/sec throughput of large input sets is the right one for the problem statement. I welcome additions where we make the test data more realistic. Using a very small data set of a few hundred bytes doesn't seem like a good test | |
for stream processing algorithm performance. | |
go test -run none -bench . -benchtime 3s -benchmem | |
testing: warning: no tests to run | |
PASS | |
BenchmarkProcessByteUnmatched-8 30 143059391 ns/op 69.90 MB/s 1 B/op 1 allocs/op | |
BenchmarkProcessByteMatched-8 30 120619527 ns/op 82.91 MB/s 1 B/op 1 allocs/op | |
BenchmarkProcessBuffer128Unmatched-8 200 17943013 ns/op 557.32 MB/s 128 B/op 1 allocs/op | |
BenchmarkProcessBuffer128Matched-8 100 34317933 ns/op 291.39 MB/s 128 B/op 1 allocs/op | |
BenchmarkProcessBuffer8192Unmatched-8 300 16336677 ns/op 612.12 MB/s 8192 B/op 1 allocs/op | |
BenchmarkProcessBuffer8192Matched-8 100 32197112 ns/op 310.59 MB/s 8192 B/op 1 allocs/op | |
ok github.com/billhathaway/streamprocessing/algo1 27.133s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"errors" | |
"fmt" | |
"io" | |
) | |
var ( | |
data = []struct { | |
input []byte | |
output []byte | |
}{ | |
{[]byte("abc"), []byte("abc")}, | |
{[]byte("elvis"), []byte("Elvis")}, | |
{[]byte("aElvis"), []byte("aElvis")}, | |
{[]byte("abcelvis"), []byte("abcElvis")}, | |
{[]byte("eelvis"), []byte("eElvis")}, | |
{[]byte("aelvis"), []byte("aElvis")}, | |
{[]byte("aabeeeelvis"), []byte("aabeeeElvis")}, | |
{[]byte("e l v i s"), []byte("e l v i s")}, | |
{[]byte("aa bb e l v i saa"), []byte("aa bb e l v i saa")}, | |
{[]byte(" elvi s"), []byte(" elvi s")}, | |
{[]byte("elvielvis"), []byte("elviElvis")}, | |
{[]byte("elvielvielviselvi1"), []byte("elvielviElviselvi1")}, | |
{[]byte("elvielviselvis"), []byte("elviElvisElvis")}, | |
// test cases to verify when we read partial find string across end of 32 byte buffer | |
{[]byte("012345678901234567890123456789elvis"), []byte("012345678901234567890123456789Elvis")}, | |
{[]byte("0123456789012345678901234567890elvis"), []byte("0123456789012345678901234567890Elvis")}, | |
{[]byte("01234567890123456789012345678901elvis"), []byte("01234567890123456789012345678901Elvis")}, | |
{[]byte("012345678901234567890123456789012elvis"), []byte("012345678901234567890123456789012Elvis")}, | |
{[]byte("0123456789012345678901234567890123elvis"), []byte("0123456789012345678901234567890123Elvis")}, | |
} | |
find = []byte("elvis") | |
repl = []byte("Elvis") | |
) | |
// processBuffer reads / writes from io.Reader/Writer using an internal buffer | |
// this greatly reduces the number of Read and Write calls at this expense of an | |
// allocation up front | |
// buffer size must be bigger than len(find) | |
func processBuffer(r io.Reader, w io.Writer, size int, find, replace []byte) error { | |
var idx int // index of how far matched into find | |
var wrote int // last index we have written from our buffer | |
if size <= len(find) { | |
return errors.New("buffer size must be > find size") | |
} | |
b := make([]byte, size) | |
for { | |
// copy remaining part of find into new buffer if we were matching when we hit end of buffer | |
for i := 0; i < idx; i++ { | |
b[i] = find[i] | |
} | |
n, err := r.Read(b[idx:]) | |
n += idx | |
// we might hit EOF when we still have some data to process from previous read | |
if err != nil && n == 0 { | |
return nil | |
} | |
for i := 0; i < n; i++ { | |
// if we matched deeper into find | |
if b[i] == find[idx] { | |
idx++ | |
if idx == len(find) { | |
// after finding pattern, write out everything since out last write until beginning of find | |
w.Write(b[wrote : i+1-len(find)]) | |
wrote = i + 1 | |
// output replacement pattern | |
w.Write(repl) | |
idx = 0 | |
} | |
} else { | |
// stopped matching further into find, but match starting byte of find | |
if b[i] == find[0] { | |
idx = 1 | |
} else { | |
idx = 0 | |
} | |
} | |
} | |
w.Write(b[wrote : n-idx]) | |
wrote = 0 | |
} | |
} | |
func processByte(r io.Reader, w io.Writer, find, replace []byte) { | |
var idx int | |
b := make([]byte, 1) | |
for { | |
n, err := r.Read(b) | |
if err != nil || n == 0 { | |
break | |
} | |
if b[0] == find[idx] { | |
idx++ | |
if idx == len(find) { | |
w.Write(repl) | |
idx = 0 | |
} | |
continue | |
} | |
if idx > 0 { | |
w.Write(find[:idx]) | |
idx = 0 | |
} | |
if b[0] == find[0] { | |
idx = 1 | |
} else { | |
w.Write(b) | |
} | |
} | |
w.Write(find[:idx]) | |
} | |
func main() { | |
output := bytes.Buffer{} | |
fmt.Println("=======================================\nRunning processByte") | |
for _, d := range data { | |
output.Reset() | |
processByte(bytes.NewReader(d.input), &output, find, repl) | |
matched := bytes.Compare(d.output, output.Bytes()) | |
fmt.Printf("Matched: %v Inp: [%s] Exp: [%s] Got: [%s]\n", matched == 0, d.input, d.output, output.Bytes()) | |
} | |
fmt.Println("=======================================\nRunning processBuffer") | |
for _, d := range data { | |
output.Reset() | |
processBuffer(bytes.NewReader(d.input), &output, 32, find, repl) | |
matched := bytes.Compare(d.output, output.Bytes()) | |
fmt.Printf("Matched: %v Inp: [%s] Exp: [%s] Got: [%s]\n", matched == 0, d.input, d.output, output.Bytes()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// All material is licensed under the Apache License Version 2.0, January 2004 | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// go test -run none -bench . -benchtime 3s -benchmem | |
// Tests to see how each algorithm compare. | |
package main | |
import ( | |
"bytes" | |
"io/ioutil" | |
"testing" | |
) | |
var bigUnmatchedInput = make([]byte, 10000000) | |
var unmatchedR = bytes.NewReader(bigUnmatchedInput) | |
var bigMatchedInput = make([]byte, 0) | |
var matchedR = &bytes.Reader{} | |
func init() { | |
for i := 0; i < 2000000; i++ { | |
bigMatchedInput = append(bigMatchedInput, []byte("elvis")...) | |
matchedR = bytes.NewReader(bigMatchedInput) | |
} | |
} | |
// assembleInputStream appends all the input slices together to allow for | |
// consistent testing across all benchmarks | |
func assembleInputStream() []byte { | |
var out []byte | |
for _, d := range data { | |
out = append(out, d.input...) | |
} | |
return out | |
} | |
// Capture the time it takes to execute algorithm processByte when no matches | |
func BenchmarkProcessByteUnmatched(b *testing.B) { | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
b.SetBytes(int64(len(bigUnmatchedInput))) | |
// Seek our reader to the beginning of the byte array | |
unmatchedR.Seek(0, 0) | |
processByte(unmatchedR, ioutil.Discard, find, repl) | |
} | |
} | |
// Capture the time it takes to execute algorithm processByte when all matches | |
func BenchmarkProcessByteMatched(b *testing.B) { | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
b.SetBytes(int64(len(bigMatchedInput))) | |
// Seek our reader to the beginning of the byte array | |
matchedR.Seek(0, 0) | |
processByte(matchedR, ioutil.Discard, find, repl) | |
} | |
} | |
// Capture the time it takes to execute algorithm processBuffer with 128 byte buffer no matches | |
func BenchmarkProcessBuffer128Unmatched(b *testing.B) { | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
b.SetBytes(int64(len(bigUnmatchedInput))) | |
// Seek our reader to the beginning of the byte array | |
unmatchedR.Seek(0, 0) | |
processBuffer(unmatchedR, ioutil.Discard, 128, find, repl) | |
} | |
} | |
// Capture the time it takes to execute algorithm processBuffer with 128 byte buffer all matches | |
func BenchmarkProcessBuffer128Matched(b *testing.B) { | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
b.SetBytes(int64(len(bigMatchedInput))) | |
// Seek our reader to the beginning of the byte array | |
matchedR.Seek(0, 0) | |
processBuffer(matchedR, ioutil.Discard, 128, find, repl) | |
} | |
} | |
// Capture the time it takes to execute algorithm processBuffer with 8192 byte buffer no matches | |
func BenchmarkProcessBuffer8192Unmatched(b *testing.B) { | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
b.SetBytes(int64(len(bigUnmatchedInput))) | |
// Seek our reader to the beginning of the byte array | |
unmatchedR.Seek(0, 0) | |
processBuffer(unmatchedR, ioutil.Discard, 8192, find, repl) | |
} | |
} | |
// Capture the time it takes to execute algorithm processBuffer with 8192 byte buffer all matches | |
func BenchmarkProcessBuffer8192Matched(b *testing.B) { | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
b.SetBytes(int64(len(bigMatchedInput))) | |
// Seek our reader to the beginning of the byte array | |
matchedR.Seek(0, 0) | |
processBuffer(matchedR, ioutil.Discard, 8192, find, repl) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment