Skip to content

Instantly share code, notes, and snippets.

@mmadfox
Last active December 17, 2020 10:36
Show Gist options
  • Save mmadfox/b3d8f33ac45a1c394d1f96b3e81fdf69 to your computer and use it in GitHub Desktop.
Save mmadfox/b3d8f33ac45a1c394d1f96b3e81fdf69 to your computer and use it in GitHub Desktop.
MULTIPLEXING CHANNELS IN GO
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
)
func genInt(n int) chan int {
ch := make(chan int)
go func() {
for i := 0; i < n; i++ {
ch <- i
}
close(ch)
}()
return ch
}
func worker(gen chan int) chan int {
ch := make(chan int)
go func() {
for task := range gen {
ch <- task
}
close(ch)
}()
return ch
}
func main() {
start := time.Now()
generator := genInt(10000)
pipe := make(chan int)
channels := make([]chan int, 0)
for i := 0; i < 10; i++ {
channels = append(channels, worker(generator))
}
_ = MuxChannel(context.TODO(), pipe, channels...)
for res := range pipe {
fmt.Printf("received result %v\n", res)
}
log.Println("close ok", time.Since(start))
}
Based on https://katcipis.github.io/blog/mux-channels-go/, but without reflection.
// MuxChannel muxes all the provided source channels on the given sink channel.
func MuxChannel(ctx context.Context, sink chan<- int, channels ...chan int) error {
if len(channels) == 0 {
return errors.New("no channels for multiplexing")
}
total := len(channels)
bmap := make([]byte, total)
go func() {
for total > 0 {
if ctx.Err() != nil {
return
}
for wi, w := range channels {
if bmap[wi] == 0 {
select {
default:
case ch, ok := <-w:
if !ok {
bmap[wi] = 1
total--
continue
}
sink <- ch
}
}
}
}
close(sink)
}()
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment