Last active
December 17, 2020 10:36
-
-
Save mmadfox/b3d8f33ac45a1c394d1f96b3e81fdf69 to your computer and use it in GitHub Desktop.
MULTIPLEXING CHANNELS IN GO
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"log" | |
"time" | |
) | |
func genInt(n int) chan int { | |
ch := make(chan int) | |
go func() { | |
for i := 0; i < n; i++ { | |
ch <- i | |
} | |
close(ch) | |
}() | |
return ch | |
} | |
func worker(gen chan int) chan int { | |
ch := make(chan int) | |
go func() { | |
for task := range gen { | |
ch <- task | |
} | |
close(ch) | |
}() | |
return ch | |
} | |
func main() { | |
start := time.Now() | |
generator := genInt(10000) | |
pipe := make(chan int) | |
channels := make([]chan int, 0) | |
for i := 0; i < 10; i++ { | |
channels = append(channels, worker(generator)) | |
} | |
_ = MuxChannel(context.TODO(), pipe, channels...) | |
for res := range pipe { | |
fmt.Printf("received result %v\n", res) | |
} | |
log.Println("close ok", time.Since(start)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Based on https://katcipis.github.io/blog/mux-channels-go/, but without reflection. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// MuxChannel muxes all the provided source channels on the given sink channel. | |
func MuxChannel(ctx context.Context, sink chan<- int, channels ...chan int) error { | |
if len(channels) == 0 { | |
return errors.New("no channels for multiplexing") | |
} | |
total := len(channels) | |
bmap := make([]byte, total) | |
go func() { | |
for total > 0 { | |
if ctx.Err() != nil { | |
return | |
} | |
for wi, w := range channels { | |
if bmap[wi] == 0 { | |
select { | |
default: | |
case ch, ok := <-w: | |
if !ok { | |
bmap[wi] = 1 | |
total-- | |
continue | |
} | |
sink <- ch | |
} | |
} | |
} | |
} | |
close(sink) | |
}() | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment