Created
January 11, 2020 07:16
-
-
Save sugamon/2ddc5083de6c6841648fd5c9fa648574 to your computer and use it in GitHub Desktop.
pubsub apiv1 subscriber
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pubsub | |
import ( | |
"context" | |
pubsub "cloud.google.com/go/pubsub/apiv1" | |
"google.golang.org/api/option" | |
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" | |
) | |
type Subscriber struct { | |
Client *pubsub.SubscriberClient | |
} | |
type PullRequestConfig struct { | |
// フォーマットは`projects/{project}/subscriptions/{sub}` | |
Subscription string | |
// リクエストに対して返されるメッセージの最大数。上限は1000 | |
MaxMessages int32 | |
} | |
type ProcessMessage = func(context.Context, *pubsubpb.ReceivedMessage) error | |
func NewSubscriber(ctx context.Context, opts ...option.ClientOption) (*Subscriber, error) { | |
client, err := pubsub.NewSubscriberClient(ctx, opts...) | |
if err != nil { | |
return nil, err | |
} | |
return &Subscriber{ | |
Client: client, | |
}, nil | |
} | |
// Pull 1回だけメッセージをPullして指定の処理(ProcessMessage関数)を実行する | |
func (s *Subscriber) Pull(ctx context.Context, config *PullRequestConfig, procMsg ProcessMessage) error { | |
defer s.Client.Close() | |
req := &pubsubpb.PullRequest{ | |
Subscription: config.Subscription, | |
MaxMessages: config.MaxMessages, | |
} | |
res, err := s.Client.Pull(ctx, req) | |
if err != nil { | |
return err | |
} | |
for _, rMsg := range res.ReceivedMessages { | |
if err := procMsg(ctx, rMsg); err != nil { | |
return err | |
} | |
// 確認応答 | |
err := s.Client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest{ | |
Subscription: config.Subscription, | |
AckIds: []string{rMsg.AckId}, | |
}) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment