-
-
Save alexrios/f6f146c6b7ef570885f6f0670e4fbfd8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
func BatchStrings(values <-chan string, maxItems int, maxTimeout time.Duration) chan []string { | |
batches := make(chan []string) | |
go func() { | |
defer close(batches) | |
for keepGoing := true; keepGoing; { | |
var batch []string | |
expire := time.After(maxTimeout) | |
for { | |
select { | |
case value, ok := <-values: | |
if !ok { | |
keepGoing = false | |
goto done | |
} | |
batch = append(batch, value) | |
if len(batch) == maxItems { | |
goto done | |
} | |
case <-expire: | |
goto done | |
} | |
} | |
done: | |
if len(batch) > 0 { | |
batches <- batch | |
} | |
} | |
}() | |
return batches | |
} | |
func main() { | |
strings := make(chan string) | |
go func() { | |
strings <- "hello" | |
strings <- "there" // hit limit of 2 | |
strings <- "how" | |
time.Sleep(15 * time.Millisecond) // hit timeout | |
strings <- "are" | |
strings <- "you" // hit limit of 2 | |
// A really long time without any new values. | |
time.Sleep(500 * time.Millisecond) | |
strings <- "doing?" // the last incomplete batch | |
close(strings) | |
}() | |
start := time.Now() | |
batches := BatchStrings(strings, 2, 10*time.Millisecond) | |
for batch := range batches { | |
fmt.Println(time.Now().Sub(start), batch) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment