Created
July 24, 2019 06:48
-
-
Save elliotchance/7340277862b0e1c8b0cec8807df9ee9c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
func BatchStrings(values <-chan string, maxItems int, maxTimeout time.Duration) chan []string { | |
batches := make(chan []string) | |
go func() { | |
defer close(batches) | |
for keepGoing := true; keepGoing; { | |
var batch []string | |
expire := time.After(maxTimeout) | |
for { | |
select { | |
case value, ok := <-values: | |
if !ok { | |
keepGoing = false | |
goto done | |
} | |
batch = append(batch, value) | |
if len(batch) == maxItems { | |
goto done | |
} | |
case <-expire: | |
goto done | |
} | |
} | |
done: | |
if len(batch) > 0 { | |
batches <- batch | |
} | |
} | |
}() | |
return batches | |
} | |
func main() { | |
strings := make(chan string) | |
go func() { | |
strings <- "hello" | |
strings <- "there" // hit limit of 2 | |
strings <- "how" | |
time.Sleep(15 * time.Millisecond) // hit timeout | |
strings <- "are" | |
strings <- "you" // hit limit of 2 | |
// A really long time without any new values. | |
time.Sleep(500 * time.Millisecond) | |
strings <- "doing?" // the last incomplete batch | |
close(strings) | |
}() | |
start := time.Now() | |
batches := BatchStrings(strings, 2, 10*time.Millisecond) | |
for batch := range batches { | |
fmt.Println(time.Now().Sub(start), batch) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment