Skip to content

Instantly share code, notes, and snippets.

@dragonsinth
Created February 14, 2020 22:00
Show Gist options
  • Save dragonsinth/badcf7c4fc0bd6bc335e849c50ef4ac1 to your computer and use it in GitHub Desktop.
Save dragonsinth/badcf7c4fc0bd6bc335e849c50ef4ac1 to your computer and use it in GitHub Desktop.
func GetFriends(ctx context.Context, user int64) (map[string]*User, error) {
g, ctx := errgroup.WithContext(ctx)
friendIds := make(chan int64)
// Produce
g.Go(func() error {
defer close(friendIds)
for it := GetFriendIds(user); ; {
if id, err := it.Next(ctx); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("GetFriendIds %d: %s", user, err)
} else {
friendIds <- id
}
}
})
friends := make(chan *User)
// Map
workers := int32(nWorkers)
for i := 0; i < nWorkers; i++ {
g.Go(func() error {
defer func() {
// Last one out closes shop
if atomic.AddInt32(&workers, -1) == 0 {
close(friends)
}
}()
for id := range friendIds {
if friend, err := GetUserProfile(ctx, id); err != nil {
return fmt.Errorf("GetUserProfile %d: %s", user, err)
} else {
friends <- friend
}
}
return nil
})
}
// Reduce
ret := map[string]*User{}
g.Go(func() error {
for friend := range friends {
ret[friend.Name] = friend
}
return nil
})
// Return the final result, and the error result from the subtask group.
return ret, g.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment