Last active
August 29, 2023 13:38
When you create a cloud build and select pubsub as a trigger, there will be created a subscription on the assigned topic. (Usually the topic of an artifact registry) The problem is that this subscription has an expiry ttl of 31 days. After 31 days of inactivity, it will be deleted. This script iterates over projects in a given folder. It then it…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"log/slog" | |
"os" | |
"strings" | |
"sync" | |
pubsub "cloud.google.com/go/pubsub/apiv1" | |
"cloud.google.com/go/pubsub/apiv1/pubsubpb" | |
resourcemanager "google.golang.org/api/cloudresourcemanager/v3" | |
"google.golang.org/api/iterator" | |
"google.golang.org/protobuf/types/known/fieldmaskpb" | |
) | |
type pubSubUpdate struct { | |
pubClient *pubsub.PublisherClient | |
subClient *pubsub.SubscriberClient | |
wg *sync.WaitGroup | |
} | |
type pubSubMessage struct { | |
Data []byte `json:"data"` | |
} | |
var log = slog.New(slog.NewJSONHandler(os.Stderr, nil)) | |
func createFolderList(org string, service *resourcemanager.Service) ([]string, error) { | |
var folders = []string{} | |
var folderTrav func(folder string) error | |
folderTrav = func(folder string) error { | |
folderList, err := service.Folders.List().Parent(folder).Do() | |
if err != nil { | |
return err | |
} | |
folders = append(folders, folder) | |
if len(folderList.Folders) > 0 { | |
for _, folder := range folderList.Folders { | |
err := folderTrav(folder.Name) | |
if err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} | |
err := folderTrav(org) | |
if err != nil { | |
return nil, err | |
} | |
return folders, nil | |
} | |
func (ps pubSubUpdate) updateSubscription(ctx context.Context, project string) { | |
defer ps.wg.Done() | |
ltr := &pubsubpb.ListTopicsRequest{ | |
Project: project, | |
} | |
// list topics in given project | |
topicIt := ps.pubClient.ListTopics(ctx, ltr) | |
for { | |
topicName, err := topicIt.Next() | |
if err == iterator.Done { | |
break | |
} | |
if err != nil { | |
log.With("project",project).Error(err.Error()) | |
break | |
} | |
// List subscriptions in given topic | |
req := &pubsubpb.ListTopicSubscriptionsRequest{ | |
Topic: topicName.Name, | |
} | |
subIt := ps.pubClient.ListTopicSubscriptions(ctx, req) | |
for { | |
subName, err := subIt.Next() | |
if err == iterator.Done { | |
break | |
} | |
if err != nil { | |
log.With("topic",topicName.Name).Error(err.Error()) | |
break | |
} | |
if strings.Contains(subName, "/gcb-") { | |
resp, err := ps.subClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: subName}) | |
if err != nil { | |
log.Error("whoops - failed to retrieve subscription") | |
continue | |
} | |
if resp.ExpirationPolicy.Ttl != nil { | |
go func() { | |
// create update request with empty expiration policy | |
updatereq := &pubsubpb.UpdateSubscriptionRequest{ | |
Subscription: &pubsubpb.Subscription{ | |
Name: subName, | |
ExpirationPolicy: &pubsubpb.ExpirationPolicy{}, | |
}, | |
UpdateMask: &fieldmaskpb.FieldMask{ | |
Paths: []string{"expiration_policy"}, | |
}, | |
} | |
// update subscription | |
resp, err := ps.subClient.UpdateSubscription(ctx, updatereq) | |
if err != nil || resp == nil { | |
log.With("subscription", subName).Error("failed to update") | |
} | |
log.Info("updated:", resp.GetName()) | |
}() | |
} | |
} | |
} | |
} | |
} | |
func main() { | |
folderID := os.Getenv("FOLDER") | |
if folderID == "" { | |
log.Error("empty root folder") | |
} | |
if !strings.HasPrefix(folderID, "folders/"){ | |
folderID = "folders/"+folderID | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
pubClient, err := pubsub.NewPublisherClient(ctx) | |
if err != nil { | |
log.Error(err.Error()) | |
} | |
defer pubClient.Close() | |
subClient, err := pubsub.NewSubscriberClient(ctx) | |
if err != nil { | |
log.Error(err.Error()) | |
} | |
defer subClient.Close() | |
psUpdate := pubSubUpdate{ | |
pubClient: pubClient, | |
subClient: subClient, | |
wg: &sync.WaitGroup{}, | |
} | |
service, err := resourcemanager.NewService(ctx) | |
if err != nil { | |
log.Error(err.Error()) | |
} | |
folders, err := createFolderList(folderID, service) | |
if err != nil { | |
log.Error(err.Error()) | |
} | |
// iterate over folders | |
for _, folder := range folders { | |
projectList, err := service.Projects.List().Parent(folder).Do() | |
if err != nil { | |
log.Error(err.Error()) | |
} | |
// iterate over projects in given folder | |
for _, project := range projectList.Projects { | |
// update subscriptions in project | |
psUpdate.wg.Add(1) | |
go psUpdate.updateSubscription(ctx, project.Name) | |
} | |
} | |
psUpdate.wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment