Created
March 29, 2020 12:37
-
-
Save WKBae/b277c962616a6df8a06d7147419b1fd6 to your computer and use it in GitHub Desktop.
Unlimited buffered channel in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
func unlimitedBufferer(in <-chan int, out chan<- int) { | |
var buffer []int | |
// consume first item from `in`. | |
ConsumingLoop: | |
for item := range in { | |
buffer = append(buffer, item) | |
// read from `in` or write to `out`, whichever comes available first, until the buffer becomes empty. | |
for len(buffer) > 0 { | |
select { | |
case item, ok := <-in: | |
if !ok { | |
// if input channel is closed, exit comsuming loop | |
break ConsumingLoop | |
} | |
buffer = append(buffer, item) | |
case out <- buffer[0]: | |
buffer = buffer[1:] | |
} | |
} | |
} | |
// channel `in` is closed; flush out remaining items to `out`. | |
for _, item := range buffer { | |
out <- item | |
} | |
// finally close `out`. | |
close(out) | |
} | |
func main() { | |
inputCh := make(chan int) | |
outputCh := make(chan int) | |
go unlimitedBufferer(inputCh, outputCh) | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
go fastProducer(wg, inputCh, 0, 10) | |
wg.Add(1) | |
go slowConsumer(wg, outputCh) | |
wg.Wait() | |
fmt.Println("Finished!") | |
} | |
func fastProducer(wg *sync.WaitGroup, ch chan<- int, start, end int) { | |
defer wg.Done() | |
for i := start; i < end; i++ { | |
ch <- i | |
} | |
close(ch) | |
fmt.Println("Producer completed!") | |
} | |
func slowConsumer(wg *sync.WaitGroup, ch <-chan int) { | |
defer wg.Done() | |
for item := range ch { | |
time.Sleep(100 * time.Millisecond) | |
fmt.Println(item) | |
} | |
fmt.Println("Consumer completed!") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment