Last active
June 5, 2018 12:09
-
-
Save prantoran/7ef9a4b129c34282bd8a87ffa15a728f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main() { | |
pflag.String("subscription", "example-subscription", "name of subscriber") | |
pflag.Parse() | |
viper.BindPFlags(pflag.CommandLine) | |
ctx := context.Background() | |
proj := "pubsubx-206305" | |
client, err := pubsub.NewClient(ctx, proj) | |
if err != nil { | |
log.Fatalf("Could not create pubsub Client: %v", err) | |
} | |
sub := viper.GetString("subscriber") // retrieve values from viper instead of pflag | |
t := createTopicIfNotExists(client, "my-topic") | |
// Create a new subscription. | |
if err := create(client, sub, t); err != nil { | |
log.Fatal(err) | |
} | |
// Pull messages via the subscription. | |
if err := pullMsgs(client, sub, t, false); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func createTopicIfNotExists(c *pubsub.Client, topic string) *pubsub.Topic { | |
ctx := context.Background() | |
t := c.Topic(topic) | |
ok, err := t.Exists(ctx) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if ok { | |
return t | |
} | |
t, err = c.CreateTopic(ctx, topic) | |
if err != nil { | |
log.Fatalf("Failed to create the topic: %v", err) | |
} | |
return t | |
} | |
func create(client *pubsub.Client, name string, topic *pubsub.Topic) error { | |
ctx := context.Background() | |
// [START pubsub_create_pull_subscription] | |
sub, err := client.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ | |
Topic: topic, | |
AckDeadline: 20 * time.Second, | |
}) | |
if err != nil { | |
return err | |
} | |
fmt.Printf("Created subscription: %v\n", sub) | |
// [END pubsub_create_pull_subscription] | |
return nil | |
} | |
func pullMsgs(client *pubsub.Client, name string, topic *pubsub.Topic, testPublish bool) error { | |
ctx := context.Background() | |
if testPublish { | |
// Publish 10 messages on the topic. | |
var results []*pubsub.PublishResult | |
for i := 0; i < 10; i++ { | |
res := topic.Publish(ctx, &pubsub.Message{ | |
Data: []byte(fmt.Sprintf("hello world #%d", i)), | |
}) | |
results = append(results, res) | |
} | |
// Check that all messages were published. | |
for _, r := range results { | |
_, err := r.Get(ctx) | |
if err != nil { | |
return err | |
} | |
} | |
} | |
var mu sync.Mutex | |
received := 0 | |
sub := client.Subscription(name) | |
cctx, cancel := context.WithCancel(ctx) | |
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) { | |
msg.Ack() | |
fmt.Printf("Got message: %q\n", string(msg.Data)) | |
mu.Lock() | |
defer mu.Unlock() | |
received++ | |
if received == 10 { | |
cancel() | |
} | |
}) | |
if err != nil { | |
return err | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment