Skip to content

Instantly share code, notes, and snippets.

@macalinao
Created September 16, 2016 05:21
Show Gist options
  • Save macalinao/3bc8b931ec43a9b8ebf2fcdc7d80d98a to your computer and use it in GitHub Desktop.
Save macalinao/3bc8b931ec43a9b8ebf2fcdc7d80d98a to your computer and use it in GitHub Desktop.
// Sum derives a sum from a set of filters.
func (a *aggregatorImpl) Sum(filters []*apb.MatchFilters) (*apb.MatchSum, error) {
// Channel containing sums
sumsChan := make(chan *apb.MatchSum)
// Error from fetching aggregates
var fetchErr error = nil
// Concurrently fetch all sums
var wg sync.WaitGroup
wg.Add(len(filters))
// Iterate over all filters
for _, filter := range filters {
// Asynchronous get
go func(filter *apb.MatchFilters) {
// Error handling
s, err := a.MatchSumDAO.Get(filter)
if err != nil {
fetchErr = err
}
// Process sum
if s != nil {
sumsChan <- s
}
wg.Done()
}(filter)
}
// Create aggregate sum
sum := (*apb.MatchSum)(nil)
go func() {
for sumRow := range sumsChan {
if sum == nil {
sum = &apb.MatchSum{}
}
sum = addMatchSums(sum, sumRow)
}
}()
// Terminate when all sums are fetched
wg.Wait()
close(sumsChan)
// Return sum and error
return sum, fetchErr
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment