Created
September 13, 2018 16:09
-
-
Save saian-nimaev/2914bd816d42b7cce6caa05e3f508aa7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sort" | |
"strings" | |
"sync" | |
) | |
type Ord struct { | |
N int | |
S string | |
} | |
func NewOrd(n int, s string) Ord { | |
return Ord{N: n, S: s} | |
} | |
type OrdCh struct { | |
N int | |
Ch <-chan Ord | |
} | |
func NewOrdCh(n int, ch <-chan Ord) OrdCh { | |
return OrdCh{N: n, Ch: ch} | |
} | |
func ExecutePipeline(jobs ...job) { | |
in := make(chan interface{}) | |
var wg sync.WaitGroup | |
wg.Add(len(jobs)) | |
for _, j := range jobs { | |
out := make(chan interface{}) | |
go func(j job, in chan interface{}, out chan interface{}) { | |
defer wg.Done() | |
defer close(out) | |
j(in, out) | |
}(j, in, out) | |
in = out | |
} | |
wg.Wait() | |
} | |
func stringify(v interface{}) string { | |
return fmt.Sprintf("%+v", v) | |
} | |
func SingleHash(in chan interface{}, out chan interface{}) { | |
a, b, n := split(in) | |
leftCh := crc32c(a, func(o Ord) string { | |
return fmt.Sprintf("%d SingleHash: crc32(data)", o.N) | |
}) | |
rightCh := crc32c(md5c(b), func(o Ord) string { | |
return fmt.Sprintf("%d SingleHash: crc32(md5(data))", o.N) | |
}) | |
ch := concat(leftCh, rightCh, n) | |
for s := range ch { | |
out <- s | |
} | |
} | |
func split(in <-chan interface{}) (<-chan Ord, <-chan Ord, <-chan int) { | |
a := make(chan Ord) | |
b := make(chan Ord) | |
n := make(chan int, 1) | |
go func() { | |
i := 0 | |
wa := sync.WaitGroup{} | |
wb := sync.WaitGroup{} | |
for d := range in { | |
data := stringify(d) | |
ord := NewOrd(i, data) | |
fmt.Println(i, "SingleHash: data", data) | |
i = i + 1 | |
wa.Add(1) | |
wb.Add(1) | |
go func() { | |
defer wa.Done() | |
a <- ord | |
}() | |
go func() { | |
defer wb.Done() | |
b <- ord | |
}() | |
} | |
n <- i | |
close(n) | |
go func() { | |
wa.Wait() | |
close(a) | |
}() | |
go func() { | |
wb.Wait() | |
close(b) | |
}() | |
}() | |
return a, b, n | |
} | |
func concat(left <-chan Ord, right <-chan Ord, nch <-chan int) <-chan string { | |
ch := make(chan string) | |
go func() { | |
n := <-nch | |
slice1 := make([]string, n) | |
for s := range left { | |
slice1[s.N] = s.S | |
} | |
slice2 := make([]string, n) | |
for s := range right { | |
slice2[s.N] = s.S | |
} | |
for i := 0; i < n; i++ { | |
pair := fmt.Sprintf("%s~%s", slice1[i], slice2[i]) | |
fmt.Println(i, "SingleHash: result", pair) | |
ch <- pair | |
} | |
close(ch) | |
}() | |
return ch | |
} | |
func md5c(in <-chan Ord) <-chan Ord { | |
ch := make(chan Ord) | |
go func() { | |
for o := range in { | |
signed := DataSignerMd5(o.S) | |
i := o.N | |
fmt.Println(i, "SingleHash: md5(data)", signed) | |
ch <- NewOrd(i, signed) | |
} | |
close(ch) | |
}() | |
return ch | |
} | |
func crc32c(in <-chan Ord, line func(Ord) string) <-chan Ord { | |
ch := make(chan Ord) | |
go func() { | |
wg := sync.WaitGroup{} | |
for o := range in { | |
wg.Add(1) | |
go func(o Ord) { | |
defer wg.Done() | |
signed := DataSignerCrc32(o.S) | |
i := o.N | |
fmt.Println(line(o), signed) | |
ch <- NewOrd(i, signed) | |
}(o) | |
} | |
go func() { | |
wg.Wait() | |
close(ch) | |
}() | |
}() | |
return ch | |
} | |
func MultiHash(in chan interface{}, out chan interface{}) { | |
const thn = 6 | |
result := joinThn(crc32Thn(splitThn(ordChannel(in), thn)), thn) | |
for s := range result { | |
out <- s.S | |
} | |
} | |
func ordChannel(in <-chan interface{}) <-chan Ord { | |
a := make(chan Ord) | |
go func() { | |
i := 0 | |
wa := sync.WaitGroup{} | |
for d := range in { | |
data := stringify(d) | |
ord := NewOrd(i, data) | |
fmt.Println(i, "MultiHash: data", data) | |
i = i + 1 | |
wa.Add(1) | |
go func() { | |
defer wa.Done() | |
a <- ord | |
}() | |
} | |
go func() { | |
wa.Wait() | |
close(a) | |
}() | |
}() | |
return a | |
} | |
func splitThn(in <-chan Ord, thn int) <-chan OrdCh { | |
ch := make(chan OrdCh) | |
go func() { | |
wg := sync.WaitGroup{} | |
for o := range in { | |
wg.Add(1) | |
go func(o Ord) { | |
defer wg.Done() | |
ch1 := make(chan Ord, thn) | |
for th := 0; th < thn; th++ { | |
ch1 <- NewOrd(th, fmt.Sprintf("%d%s", th, o.S)) | |
} | |
close(ch1) | |
ch <- NewOrdCh(o.N, ch1) | |
}(o) | |
} | |
go func() { | |
wg.Wait() | |
close(ch) | |
}() | |
}() | |
return ch | |
} | |
func crc32Thn(in <-chan OrdCh) <-chan OrdCh { | |
ch := make(chan OrdCh) | |
go func() { | |
wg := sync.WaitGroup{} | |
for och := range in { | |
wg.Add(1) | |
go func(och OrdCh) { | |
defer wg.Done() | |
c := crc32c(och.Ch, func(o Ord) string { | |
return fmt.Sprintf("%d MultiHash: crc32(th+data) %d", och.N, o.N) | |
}) | |
ch <- NewOrdCh(och.N, c) | |
}(och) | |
} | |
go func() { | |
wg.Wait() | |
close(ch) | |
}() | |
}() | |
return ch | |
} | |
func joinThn(in <-chan OrdCh, thn int) <-chan Ord { | |
ch := make(chan Ord) | |
go func() { | |
wg := sync.WaitGroup{} | |
for och := range in { | |
wg.Add(1) | |
go func(och OrdCh) { | |
defer wg.Done() | |
slice := make([]string, thn) | |
for o := range och.Ch { | |
slice[o.N] = o.S | |
} | |
acc := strings.Join(slice, "") | |
i := och.N | |
fmt.Println(i, "MultiHash: result", acc) | |
ch <- NewOrd(i, acc) | |
}(och) | |
} | |
go func() { | |
wg.Wait() | |
close(ch) | |
}() | |
}() | |
return ch | |
} | |
func CombineResults(in chan interface{}, out chan interface{}) { | |
slice := make([]string, 0) | |
for d := range in { | |
data := stringify(d) | |
slice = append(slice, data) | |
} | |
sort.Strings(slice) | |
join := strings.Join(slice, "_") | |
fmt.Println("CombineResults", len(slice), join) | |
out <- join | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment