Skip to content

Instantly share code, notes, and snippets.

@saian-nimaev
Created September 13, 2018 16:09
Show Gist options
  • Save saian-nimaev/2914bd816d42b7cce6caa05e3f508aa7 to your computer and use it in GitHub Desktop.
Save saian-nimaev/2914bd816d42b7cce6caa05e3f508aa7 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"sort"
"strings"
"sync"
)
type Ord struct {
N int
S string
}
func NewOrd(n int, s string) Ord {
return Ord{N: n, S: s}
}
type OrdCh struct {
N int
Ch <-chan Ord
}
func NewOrdCh(n int, ch <-chan Ord) OrdCh {
return OrdCh{N: n, Ch: ch}
}
func ExecutePipeline(jobs ...job) {
in := make(chan interface{})
var wg sync.WaitGroup
wg.Add(len(jobs))
for _, j := range jobs {
out := make(chan interface{})
go func(j job, in chan interface{}, out chan interface{}) {
defer wg.Done()
defer close(out)
j(in, out)
}(j, in, out)
in = out
}
wg.Wait()
}
func stringify(v interface{}) string {
return fmt.Sprintf("%+v", v)
}
func SingleHash(in chan interface{}, out chan interface{}) {
a, b, n := split(in)
leftCh := crc32c(a, func(o Ord) string {
return fmt.Sprintf("%d SingleHash: crc32(data)", o.N)
})
rightCh := crc32c(md5c(b), func(o Ord) string {
return fmt.Sprintf("%d SingleHash: crc32(md5(data))", o.N)
})
ch := concat(leftCh, rightCh, n)
for s := range ch {
out <- s
}
}
func split(in <-chan interface{}) (<-chan Ord, <-chan Ord, <-chan int) {
a := make(chan Ord)
b := make(chan Ord)
n := make(chan int, 1)
go func() {
i := 0
wa := sync.WaitGroup{}
wb := sync.WaitGroup{}
for d := range in {
data := stringify(d)
ord := NewOrd(i, data)
fmt.Println(i, "SingleHash: data", data)
i = i + 1
wa.Add(1)
wb.Add(1)
go func() {
defer wa.Done()
a <- ord
}()
go func() {
defer wb.Done()
b <- ord
}()
}
n <- i
close(n)
go func() {
wa.Wait()
close(a)
}()
go func() {
wb.Wait()
close(b)
}()
}()
return a, b, n
}
func concat(left <-chan Ord, right <-chan Ord, nch <-chan int) <-chan string {
ch := make(chan string)
go func() {
n := <-nch
slice1 := make([]string, n)
for s := range left {
slice1[s.N] = s.S
}
slice2 := make([]string, n)
for s := range right {
slice2[s.N] = s.S
}
for i := 0; i < n; i++ {
pair := fmt.Sprintf("%s~%s", slice1[i], slice2[i])
fmt.Println(i, "SingleHash: result", pair)
ch <- pair
}
close(ch)
}()
return ch
}
func md5c(in <-chan Ord) <-chan Ord {
ch := make(chan Ord)
go func() {
for o := range in {
signed := DataSignerMd5(o.S)
i := o.N
fmt.Println(i, "SingleHash: md5(data)", signed)
ch <- NewOrd(i, signed)
}
close(ch)
}()
return ch
}
func crc32c(in <-chan Ord, line func(Ord) string) <-chan Ord {
ch := make(chan Ord)
go func() {
wg := sync.WaitGroup{}
for o := range in {
wg.Add(1)
go func(o Ord) {
defer wg.Done()
signed := DataSignerCrc32(o.S)
i := o.N
fmt.Println(line(o), signed)
ch <- NewOrd(i, signed)
}(o)
}
go func() {
wg.Wait()
close(ch)
}()
}()
return ch
}
func MultiHash(in chan interface{}, out chan interface{}) {
const thn = 6
result := joinThn(crc32Thn(splitThn(ordChannel(in), thn)), thn)
for s := range result {
out <- s.S
}
}
func ordChannel(in <-chan interface{}) <-chan Ord {
a := make(chan Ord)
go func() {
i := 0
wa := sync.WaitGroup{}
for d := range in {
data := stringify(d)
ord := NewOrd(i, data)
fmt.Println(i, "MultiHash: data", data)
i = i + 1
wa.Add(1)
go func() {
defer wa.Done()
a <- ord
}()
}
go func() {
wa.Wait()
close(a)
}()
}()
return a
}
func splitThn(in <-chan Ord, thn int) <-chan OrdCh {
ch := make(chan OrdCh)
go func() {
wg := sync.WaitGroup{}
for o := range in {
wg.Add(1)
go func(o Ord) {
defer wg.Done()
ch1 := make(chan Ord, thn)
for th := 0; th < thn; th++ {
ch1 <- NewOrd(th, fmt.Sprintf("%d%s", th, o.S))
}
close(ch1)
ch <- NewOrdCh(o.N, ch1)
}(o)
}
go func() {
wg.Wait()
close(ch)
}()
}()
return ch
}
func crc32Thn(in <-chan OrdCh) <-chan OrdCh {
ch := make(chan OrdCh)
go func() {
wg := sync.WaitGroup{}
for och := range in {
wg.Add(1)
go func(och OrdCh) {
defer wg.Done()
c := crc32c(och.Ch, func(o Ord) string {
return fmt.Sprintf("%d MultiHash: crc32(th+data) %d", och.N, o.N)
})
ch <- NewOrdCh(och.N, c)
}(och)
}
go func() {
wg.Wait()
close(ch)
}()
}()
return ch
}
func joinThn(in <-chan OrdCh, thn int) <-chan Ord {
ch := make(chan Ord)
go func() {
wg := sync.WaitGroup{}
for och := range in {
wg.Add(1)
go func(och OrdCh) {
defer wg.Done()
slice := make([]string, thn)
for o := range och.Ch {
slice[o.N] = o.S
}
acc := strings.Join(slice, "")
i := och.N
fmt.Println(i, "MultiHash: result", acc)
ch <- NewOrd(i, acc)
}(och)
}
go func() {
wg.Wait()
close(ch)
}()
}()
return ch
}
func CombineResults(in chan interface{}, out chan interface{}) {
slice := make([]string, 0)
for d := range in {
data := stringify(d)
slice = append(slice, data)
}
sort.Strings(slice)
join := strings.Join(slice, "_")
fmt.Println("CombineResults", len(slice), join)
out <- join
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment