Created
July 10, 2024 19:06
-
-
Save eraserhd/130b9817030d3e11aeb6495c66a5e354 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/bindings/aws/sqs/sqs.go b/bindings/aws/sqs/sqs.go | |
index 465e061b..356062e4 100644 | |
--- a/bindings/aws/sqs/sqs.go | |
+++ b/bindings/aws/sqs/sqs.go | |
@@ -33,13 +33,14 @@ import ( | |
// AWSSQS allows receiving and sending data to/from AWS SQS. | |
type AWSSQS struct { | |
- Client *sqs.SQS | |
- QueueURL *string | |
- | |
- logger logger.Logger | |
- wg sync.WaitGroup | |
- closeCh chan struct{} | |
- closed atomic.Bool | |
+ Client *sqs.SQS | |
+ | |
+ queueName string | |
+ queueURL atomic.Pointer[string] | |
+ logger logger.Logger | |
+ wg sync.WaitGroup | |
+ closeCh chan struct{} | |
+ closed atomic.Bool | |
} | |
type sqsMetadata struct { | |
@@ -65,21 +66,12 @@ func (a *AWSSQS) Init(ctx context.Context, metadata bindings.Metadata) error { | |
if err != nil { | |
return err | |
} | |
+ a.queueName = m.QueueName | |
client, err := a.getClient(m) | |
if err != nil { | |
return err | |
} | |
- | |
- queueName := m.QueueName | |
- resultURL, err := client.GetQueueUrlWithContext(ctx, &sqs.GetQueueUrlInput{ | |
- QueueName: aws.String(queueName), | |
- }) | |
- if err != nil { | |
- return err | |
- } | |
- | |
- a.QueueURL = resultURL.QueueUrl | |
a.Client = client | |
return nil | |
@@ -91,11 +83,14 @@ func (a *AWSSQS) Operations() []bindings.OperationKind { | |
func (a *AWSSQS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { | |
msgBody := string(req.Data) | |
- _, err := a.Client.SendMessageWithContext(ctx, &sqs.SendMessageInput{ | |
+ url, err := a.getQueueURL(ctx) | |
+ if err != nil { | |
+ return nil, err | |
+ } | |
+ _, err = a.Client.SendMessageWithContext(ctx, &sqs.SendMessageInput{ | |
MessageBody: &msgBody, | |
- QueueUrl: a.QueueURL, | |
+ QueueUrl: &url, | |
}) | |
- | |
return nil, err | |
} | |
@@ -104,6 +99,11 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error { | |
return errors.New("binding is closed") | |
} | |
+ url, err := a.getQueueURL(ctx) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ | |
a.wg.Add(1) | |
go func() { | |
defer a.wg.Done() | |
@@ -115,7 +115,7 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error { | |
} | |
result, err := a.Client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ | |
- QueueUrl: a.QueueURL, | |
+ QueueUrl: &url, | |
AttributeNames: aws.StringSlice([]string{ | |
"SentTimestamp", | |
}), | |
@@ -126,7 +126,7 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error { | |
WaitTimeSeconds: aws.Int64(20), | |
}) | |
if err != nil { | |
- a.logger.Errorf("Unable to receive message from queue %q, %v.", *a.QueueURL, err) | |
+ a.logger.Errorf("Unable to receive message from queue %q, %v.", url, err) | |
} | |
if len(result.Messages) > 0 { | |
@@ -141,7 +141,7 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error { | |
// Use a background context here because ctx may be canceled already | |
a.Client.DeleteMessageWithContext(context.Background(), &sqs.DeleteMessageInput{ | |
- QueueUrl: a.QueueURL, | |
+ QueueUrl: &url, | |
ReceiptHandle: msgHandle, | |
}) | |
} | |
@@ -187,6 +187,20 @@ func (a *AWSSQS) getClient(metadata *sqsMetadata) (*sqs.SQS, error) { | |
return c, nil | |
} | |
+func (a *AWSSQS) getQueueURL(ctx context.Context) (string, error) { | |
+ if url := a.queueURL.Load(); url != nil { | |
+ return *url, nil | |
+ } | |
+ resultURL, err := a.Client.GetQueueUrlWithContext(ctx, &sqs.GetQueueUrlInput{ | |
+ QueueName: aws.String(a.queueName), | |
+ }) | |
+ if err != nil { | |
+ return "", err | |
+ } | |
+ a.queueURL.CompareAndSwap(nil, resultURL.QueueUrl) | |
+ return *resultURL.QueueUrl, nil | |
+} | |
+ | |
// GetComponentMetadata returns the metadata of the component. | |
func (a *AWSSQS) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { | |
metadataStruct := sqsMetadata{} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment