Created
February 8, 2022 09:36
-
-
Save skymeyer/14d6e45321574241d50647b2f03fb3b4 to your computer and use it in GitHub Desktop.
cloudevents golang sdk - Pub/Sub with and without ordering key
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"os" | |
"time" | |
"cloud.google.com/go/pubsub" | |
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2" | |
ce "github.com/cloudevents/sdk-go/v2" | |
"github.com/google/uuid" | |
"github.com/kelseyhightower/envconfig" | |
) | |
// export PROJECTID=skymeyer-playground | |
// export TOPICID=test-ordering | |
// export SUBSCRIPTIONID=test-ordering-sub | |
type envConfig struct { | |
ProjectID string | |
TopicID string | |
SubscriptionID string | |
} | |
type Example struct { | |
Foo string | |
} | |
func receive(ctx context.Context, event ce.Event) error { | |
fmt.Printf("Event Context: %+v\n", event.Context) | |
data := &Example{} | |
if err := event.DataAs(data); err != nil { | |
fmt.Printf("Got Data Error: %s\n", err.Error()) | |
} | |
fmt.Printf("Data processed: %+v\n", data) | |
fmt.Printf("----------------------------\n") | |
return nil | |
} | |
func main() { | |
ctx := context.Background() | |
var env envConfig | |
if err := envconfig.Process("", &env); err != nil { | |
log.Printf("[ERROR] Failed to process env var: %s", err) | |
os.Exit(1) | |
} | |
t, err := cepubsub.New(context.Background(), | |
cepubsub.WithProjectID(env.ProjectID), | |
cepubsub.WithTopicID(env.TopicID), | |
cepubsub.WithSubscriptionID(env.SubscriptionID), | |
cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{ | |
MaxOutstandingMessages: 1, // This is key to not get parallel messages | |
}), | |
// Both topic and subscription are created in advance. Note: | |
// Ensure your subscription has message ordering enabled when it was created in GCP. | |
cepubsub.AllowCreateTopic(false), | |
cepubsub.AllowCreateSubscription(false), | |
) | |
if err != nil { | |
log.Fatalf("failed to create pubsub protocol, %s", err.Error()) | |
} | |
c, err := ce.NewClient(t) | |
if err != nil { | |
log.Fatalf("failed to create client, %s", err.Error()) | |
} | |
log.Printf("Producing messages without ordering key ...\n") | |
for i := 1; i <= 5; i++ { | |
// Create a new event with incremental counter | |
event := ce.NewEvent() | |
event.SetID(uuid.New().String()) | |
event.SetType("example.stuff") | |
event.SetSource("example/stuff") | |
event.SetData(ce.ApplicationJSON, &Example{Foo: fmt.Sprintf("bar_%d", i)}) | |
log.Printf("Producing message %d [%s]...\n", i, event.ID()) | |
if res := c.Send(ctx, event); !ce.IsACK(res) { | |
log.Fatalf("failed sending %d: %+v", i, res) | |
} | |
} | |
log.Println("Wait a bit before starting receiver ...") | |
time.Sleep(5 * time.Second) | |
log.Println("Created client, listening...") | |
if err := c.StartReceiver(ctx, receive); err != nil { | |
log.Fatalf("failed to start pubsub receiver, %s", err.Error()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ go run ./pubsub-no-order | |
2022/02/08 01:31:35 Producing messages without ordering key ... | |
2022/02/08 01:31:35 Producing message 1 [a3a23fe9-9f93-4363-8506-53a329a27026]... | |
2022/02/08 01:31:36 Producing message 2 [eb6d771c-5435-4d1c-875d-0d19bc764463]... | |
2022/02/08 01:31:36 Producing message 3 [2da1a55c-2bc6-48a2-a27d-22fc370a9d4e]... | |
2022/02/08 01:31:36 Producing message 4 [49301d40-c1f6-4165-ab1d-506deda160c5]... | |
2022/02/08 01:31:36 Producing message 5 [b71fe450-afe0-4106-93a4-765f75cf8682]... | |
2022/02/08 01:31:36 Wait a bit before starting receiver ... | |
2022/02/08 01:31:41 Created client, listening... | |
{"level":"info","ts":1644312701.750985,"logger":"fallback","caller":"[email protected]/protocol.go:195","msg":"starting subscriber for Topic \"\", Subscription \"test-ordering-sub\""} | |
{"level":"info","ts":1644312701.751117,"logger":"fallback","caller":"[email protected]/protocol.go:198","msg":"conn is&{false false skymeyer-playground 0xc000476200 <nil> test-ordering-sub <nil> {0 0} 0xc00012b0c0 <nil> <nil> false }"} | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: 2da1a55c-2bc6-48a2-a27d-22fc370a9d4e | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_3} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: b71fe450-afe0-4106-93a4-765f75cf8682 | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_5} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: 49301d40-c1f6-4165-ab1d-506deda160c5 | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_4} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: a3a23fe9-9f93-4363-8506-53a329a27026 | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_1} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: eb6d771c-5435-4d1c-875d-0d19bc764463 | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_2} | |
---------------------------- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"os" | |
"time" | |
"cloud.google.com/go/pubsub" | |
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2" | |
ce "github.com/cloudevents/sdk-go/v2" | |
"github.com/google/uuid" | |
"github.com/kelseyhightower/envconfig" | |
"google.golang.org/api/option" | |
) | |
// export PROJECTID=skymeyer-playground | |
// export TOPICID=test-ordering | |
// export SUBSCRIPTIONID=test-ordering-sub | |
type envConfig struct { | |
ProjectID string | |
TopicID string | |
SubscriptionID string | |
} | |
type Example struct { | |
Foo string | |
} | |
func receive(ctx context.Context, event ce.Event) error { | |
fmt.Printf("Event Context: %+v\n", event.Context) | |
data := &Example{} | |
if err := event.DataAs(data); err != nil { | |
fmt.Printf("Got Data Error: %s\n", err.Error()) | |
} | |
fmt.Printf("Data processed: %+v\n", data) | |
fmt.Printf("----------------------------\n") | |
return nil | |
} | |
func main() { | |
ctx := context.Background() | |
var env envConfig | |
if err := envconfig.Process("", &env); err != nil { | |
log.Printf("[ERROR] Failed to process env var: %s", err) | |
os.Exit(1) | |
} | |
// When producing messages with ordering, you probably want to send them to a specific region. | |
// This is only really required in case there are multiple producers spread across different | |
// regions to ensure all message with the same ordering key end up in the same region, see: | |
// https://cloud.google.com/pubsub/docs/ordering#receiving_messages_in_order | |
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints | |
psc, err := pubsub.NewClient(ctx, env.ProjectID, | |
option.WithEndpoint("us-central1-pubsub.googleapis.com:443"), | |
) | |
if err != nil { | |
log.Fatalf("failed setting up pubsub client: %+v", err) | |
} | |
t, err := cepubsub.New(context.Background(), | |
cepubsub.WithProjectID(env.ProjectID), | |
cepubsub.WithTopicID(env.TopicID), | |
cepubsub.WithSubscriptionID(env.SubscriptionID), | |
cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{ | |
MaxOutstandingMessages: 1, // This is key to not get parallel messages | |
}), | |
// Both topic and subscription are created in advance. Note: | |
// Ensure your subscription has message ordering enabled when it was created in GCP. | |
cepubsub.AllowCreateTopic(false), | |
cepubsub.AllowCreateSubscription(false), | |
// Following settings are for the event producer specifically to allow producing messages | |
// with an ordering key passed along in the producer context (see below). | |
cepubsub.WithMessageOrdering(), | |
cepubsub.WithClient(psc), // see note above regarding regional endpoint client | |
) | |
if err != nil { | |
log.Fatalf("failed to create pubsub protocol, %s", err.Error()) | |
} | |
c, err := ce.NewClient(t) | |
if err != nil { | |
log.Fatalf("failed to create client, %s", err.Error()) | |
} | |
// Create an ordering key and pass it along to our producer context | |
orderKey := uuid.New().String() | |
pctx := cepubsub.WithOrderingKey(ctx, orderKey) | |
log.Printf("Producing messages with ordering key %q ...\n", orderKey) | |
for i := 1; i <= 5; i++ { | |
// Create a new event with incremental counter | |
event := ce.NewEvent() | |
event.SetID(uuid.New().String()) | |
event.SetType("example.stuff") | |
event.SetSource("example/stuff") | |
event.SetData(ce.ApplicationJSON, &Example{Foo: fmt.Sprintf("bar_%d", i)}) | |
log.Printf("Producing message %d [%s]...\n", i, event.ID()) | |
if res := c.Send(pctx, event); !ce.IsACK(res) { | |
log.Fatalf("failed sending %d: %+v", i, res) | |
} | |
} | |
log.Println("Wait a bit before starting receiver ...") | |
time.Sleep(5 * time.Second) | |
log.Println("Created client, listening...") | |
if err := c.StartReceiver(ctx, receive); err != nil { | |
log.Fatalf("failed to start pubsub receiver, %s", err.Error()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ go run ./pubsub-with-order | |
2022/02/08 01:32:00 Producing messages with ordering key "e55e405b-4f3b-40e6-9b6e-d6bee672b100" ... | |
2022/02/08 01:32:00 Producing message 1 [a59f3356-f0f6-4eeb-9ddc-54acc44e47a4]... | |
2022/02/08 01:32:02 Producing message 2 [06cd231c-6ccf-4a76-94c2-743784d7848a]... | |
2022/02/08 01:32:02 Producing message 3 [38361a02-f508-40e6-b7af-cc3004c787f9]... | |
2022/02/08 01:32:03 Producing message 4 [ca585f32-d382-4789-bcff-836d66a191ac]... | |
2022/02/08 01:32:03 Producing message 5 [cfa566f5-9bdb-4e38-9c58-c4f1ed7029bf]... | |
2022/02/08 01:32:03 Wait a bit before starting receiver ... | |
2022/02/08 01:32:08 Created client, listening... | |
{"level":"info","ts":1644312728.664626,"logger":"fallback","caller":"[email protected]/protocol.go:195","msg":"starting subscriber for Topic \"\", Subscription \"test-ordering-sub\""} | |
{"level":"info","ts":1644312728.664775,"logger":"fallback","caller":"[email protected]/protocol.go:198","msg":"conn is&{false false skymeyer-playground 0xc000616200 <nil> test-ordering-sub <nil> {0 0} 0xc000618240 <nil> <nil> true }"} | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: a59f3356-f0f6-4eeb-9ddc-54acc44e47a4 | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_1} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: 06cd231c-6ccf-4a76-94c2-743784d7848a | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_2} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: 38361a02-f508-40e6-b7af-cc3004c787f9 | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_3} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: ca585f32-d382-4789-bcff-836d66a191ac | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_4} | |
---------------------------- | |
Event Context: Context Attributes, | |
specversion: 1.0 | |
type: example.stuff | |
source: example/stuff | |
id: cfa566f5-9bdb-4e38-9c58-c4f1ed7029bf | |
datacontenttype: application/json | |
Data processed: &{Foo:bar_5} | |
---------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment